分布式 2022-11-10

6.824 Lab1 : MapReduce

阅读次数 8 评论数 0

lab1:MapReduce

论文分析:MapReduce阅读笔记

Map函数:接受一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。

把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。

Reduce函数:Reduce函数合并这些value值,一般的,每次Reduce函数调用只产生0或1个输出value值(比如将Map记录的my这个词的所有出现次数1,1,1,1,1,1,1...合成为一个value值为7)

这样我们就可以处理无法全部放入内存中的大量的value值的集合

完成一个符合MapReduce模型的规范对象,即可调用

流程

1.用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB

到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副

本。

2.这些程序副本中的有一个特殊的程序--master。副本中其它的程序都是worker程序,由master分配任

务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空

闲的worker

3.被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value

pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value

pair,并缓存在内存中。

4.缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的

key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给

Reduce worker。(master存储了Map任务产生的R个中间文件存储区域的大小和位置,Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务)

5.当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主

机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后(完成排序以后的合并文件就是reduce任务的输入文件),通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进

行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。

6.Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这

个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加

所属分区的输出文件。(最终每区一个)

7.当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对

MapReduce调用才返回。

容错

  1. worker:master周期性的ping每个worker

    失效map worker的全部任务都会换个map worker重置再来(因此时的map任务失效,其结果的位置信息不会在被回传给master,对应的磁盘存储虽然被占用,但不会被访问)。一个没读过这个失效worker的新reduce worker会收到通知开始读。

    失效reduce worker已完成的任务不用再重复完成,换一个worker将剩下的完成

  2. master失效的情况未做细致说明

实验分析

需完成的内容

需要在mr package里的写代码

  • coordinator.go:相当于论文中的master

    • 设计Coordinator和Task的结构体

    • MakeCoordinator():初始化创建一个coordinator的函数

    • -- RPC handlers for the worker to call.本实验中取名为GetTask()函数

    • Done():最终完成后的函数

  • rpc.go

    • 设计rpc请求args和响应reply的结构体

  • worker.go

    • Worker():需要完成的具体任务编写,比如请求task......

启动结果检测

在linux环境下运行,再src/main目录下运行测试脚本

每修改mr包下的任何代码,都需要再:

$ go build -buildmode=plugin ../mrapps/wc.go

运行测试脚本:

sh test-mr.sh

rules

  • The map phase should divide the intermediate keys into buckets for nReduce reduce tasks, where nReduce is the argument that main/mrmaster.go passes to MakeMaster().

    worker的map部分需要完成的内容是将文件内容根据key将kv键值对放入nReduce个bucket桶中(后续通过ihash后取模的方式决定了放入哪个桶)

  • The worker implementation should put the output of the X'th reduce task in the file mr-out-X.

    worker需要做的是将最终reduce后的内容输出到nReduce个以mr-out-X命名的文件中

  • A mr-out-X file should contain one line per Reduce function output. The line should be generated with the Go "%v %v" format, called with the key and value. Have a look in main/mrsequential.go for the line commented "this is the correct format". The test script will fail if your implementation deviates too much from this format.

    最终生成的mr-out-X,每个输出的结果都应该是一行,必须以kv键值对形式输出。正确形式可在main/mrsequential.go中看到

  • You can modify mr/worker.go, mr/master.go, and mr/rpc.go. You can temporarily modify other files for testing, but make sure your code works with the original versions; we'll test with the original versions.

    只能修改mr文件下下的三个文件mr/worker.go, mr/master.go, and mr/rpc.go

  • The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.

    worker应该将map后的输出结果放在当前目录下,这样reduce部分才可以继续读这些文件

  • main/mrmaster.go expects mr/master.go to implement a Done() method that returns true when the MapReduce job is completely finished; at that point, mrmaster.go will exit.

    以mr中master.go中的Done()方法的结束作为全部实验的结束

  • When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from call(): if the worker fails to contact the master, it can assume that the master has exited because the job is done, and so the worker can terminate too. Depending on your design, you might also find it helpful to have a "please exit" pseudo-task that the master can give to workers.

    worker可通过能否Ping通call()来决定是否退出,当然自己设计一个master可以给worker的exit伪task也是有帮助的

hints

  • One way to get started is to modify mr/worker.go's Worker() to send an RPC to the master asking for a task. Then modify the master to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

    通过worker.go中的Worker()函数发送 想得到一个task的RPC请求给master,master返回respond一个没有开始的map task,然后编写worker开始读写,可以用mrsequential.go中的map function

  • This lab relies on the workers sharing a file system. That's straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.

    如果需要让它在多个机器上运行,需要像GFS这样的系统

  • A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

    map结束后,准备给reduce使用时保存的临时文件可以mr-X-Y命名,X是map task的编号,Y是reduce task的编号

  • The worker's map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go's encoding/json package. To write key/value pairs to a JSON file:

    需要保存map解析files中得到的key-value对,才能继续下一步被reduce使用,可以考虑go的encoding/json

    写入:

      enc := json.NewEncoder(file)
      for _, kv := ... {
        err := enc.Encode(&kv)

    读出

      dec := json.NewDecoder(file)
      for {
        var kv KeyValue
        if err := dec.Decode(&kv); err != nil {
          break
        }
        kva = append(kva, kv)
      }
  • The map part of your worker can use the ihash(key) function (in worker.go) to pick the reduce task for a given key.

    map part中可以用ihash(key)来决定放给哪个reduce task来解决

  • You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.

    可以使用mrsequential.go中的代码来读map files,并且进行Map和Reduce间的sorting和Reduce output files的sorting

  • The master, as an RPC server, will be concurrent; don't forget to lock shared data.

    master和RPC server是同时发生的,不要忘记锁住你的数据

  • Use Go's race detector, with go build -race and go run -race. test-mr.sh has a comment that shows you how to enable the race detector for the tests.

    可以用-race来测试并发

  • Workers will sometimes need to wait, e.g. reduces can't start until the last map has finished. One possibility is for workers to periodically ask the master for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the master to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won't prevent the master from processing other RPCs.

    workers常常需要等待,比如reduces在所有map完全结束前不能开始(还要sort),你可以让每个对task的请求后都time.sleep()一下,或者让rpc每次都wait一下。让每个rpc都有自己的thread,这样其他thread的wait并不会造成干扰

  • The master can't reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the master wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the master wait for ten seconds; after that the master should assume the worker has died (of course, it might not have).

    master是不能区分哪些worker是崩了的,也存在一些worker,它们存在但是停滞了,或它们在执行但是速度太慢了。所以,等待worker一些时间,然后就放弃它们并且重新分配task给别的worker。这个lab,给每个worker十秒钟的时间,超过这个时间的话,master就应该怀疑worker是不是烂掉了

  • To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

    为了确保在crashes时那些部分写入未完成但crash的文件不来影响大局,可以在过程中使用暂时名,任务准确完成后再将输出文件改名

  • test-mr.sh runs all the processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there.

    测试脚本输出的结果都将保存在mr-tmp目录下

最终思路笔记

数据结构

首先是数据结构,Coordinator相当于论文中的master

coordinator.go中的

type Coordinator struct {
    // Your definitions here.
    Filenames []string
​
    NReduce int
    NMap    int
​
    MapTasks    chan Task
    ReduceTasks chan Task
​
    RunningTasks []Task
}
​
type Task struct {
    //map/reduce/none/exit
    TaskType  string
    Filename  string
    TaskIndex int
    //在running task的startTime,如果超过10s则移除
    StartTime time.Time
    WorkId    int
}
​

还有 保证每次只有一个 Go 程能够访问一个共享的变量的互斥锁

var mu sync.Mutex

rpc.go中的请求与回复结构体

type AskingArgs struct {
    AskingType string    //ask finish
    WorkId int           //unique id
    TaskType  string     //map reduce
    TaskIndex int
}
​
type Reply struct {
    XTask   Task
    NReduce int
    NMap    int
}

初始化

coordinator.go中的MakeCoordinator函数

创建一个coordinator,并将map和reduce任务都添加进channel

// create a Coordinator.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        MapTasks:    make(chan Task, len(files)),
        ReduceTasks: make(chan Task, nReduce),
        NReduce:     nReduce,
        NMap:        len(files),
        Filenames:   files,
    }
​
    //添加map任务至channel
    for i, file := range files {
        task := Task{TaskType: "map", Filename: file, TaskIndex: i}
        c.MapTasks <- task
    }
​
    //添加reduce任务至channel
    for i := 0; i < nReduce; i++ {
        task := Task{TaskType: "reduce", TaskIndex: i}
        c.ReduceTasks <- task
    }
​
    c.server()
    
    ...
    
    return &c

并开启测试超时的task的go程(上一代码块...处),一个task如果10s还未完成,那么worker可能就出问题了

    //开启一个线程每秒检查有没有超时任务,超时了就代表worker失效了,需要重新分配task给别的worker
    //但这种假设是存在于任务比较小的情况下
    go func() {
        for {
            time.Sleep(time.Second / 10)
            for i, task := range c.RunningTasks {
                //任务如果超时则放回channel
                if time.Now().Sub(task.StartTime).Seconds() >= 10 {
                    if task.TaskType == "map" {
                        c.MapTasks <- task
                    } else if task.TaskType == "reduce" {
                        c.ReduceTasks <- task
                    }
                    //把超时任务从RunningTask中移除
                    copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                    c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
                    break
                }
            }
        }
    }()

Worker函数

整体思路:一个大的for循环

让它得到属于它的workerID:

workID, _ := rand.Int(rand.Reader, big.NewInt(100000000))

每次调用rpc前先time.sleep:

进入for循环

taskType := "none"
for taskType != "exit" {
//每秒调用一次RPC
time.Sleep(time.Second)

将args请求的taskType改为asking并获取task:

args := AskingArgs{}
args.WorkId = int(workID.Int64())
args.AskingType = "asking"
​
workID := int(workID.Int64())
reply := Reply{}
​
//rpc发送任务请求
Call(args.WorkId, &args, &reply)
taskType = reply.XTask.TaskType
taskIndex := reply.XTask.TaskIndex

taskType为map,建buf桶,通过hash取模将kv放到NReduce个小桶

//如果得到的task是map
if reply.XTask.TaskType == "map" {
    fileName := reply.XTask.Filename
    //读取文件
    file, err := os.Open(fileName)
    if err != nil {
        log.Fatalf("cannot open %v", fileName)
    }
    content, err := ioutil.ReadAll(file)
    if err != nil {
        log.Fatalf("cannot read %v", fileName)
    }
    file.Close(
    //把文件内容根据key放入buf桶中
    kva := mapf(fileName, string(content))
    //初始化buf桶
    var buf [][]KeyValue
    for i := 0; i < reply.NReduce; i++ {
        buf = append(buf, []KeyValue{})
    }
    for _, value := range kva {
        //通过hash取模,将内容输出到NRdeuce个文件中
        hashKey := ihash(value.Key)
        num := hashKey % reply.NReduce
        buf[num] = append(buf[num], value)
    }
    ...

对输出结果进行排序

//对输出结果进行排序
for i := 0; i < reply.NReduce; i++ {
    sort.Sort(ByKey(buf[i]))
}

用json.Encoder输出到的对应临时名为mr-map-*的文件,全部输出完后再重命名

//输出到文件
for i := 0; i < reply.NReduce; i++ {
    tmpFile, error := ioutil.TempFile("", "mr-map-*")
    if error != nil {
        log.Fatalf("can't open tempFile")
    }
    enc := json.NewEncoder(tmpFile)
    err := enc.Encode(buf[i])
    if err != nil {
        log.Fatalf("fail to encode")
    }
    tmpFile.Close()
    //fileName的顺序是IdMap-ReduceNum
    outFileName := "mr-" + strconv.Itoa(reply.XTask.TaskIndex) + "-" + strconv.Itoa(i)
​
    //确认完成后再改名
    os.Rename(tmpFile.Name(), outFileName)
}

taskType为reduce,json.Decode读取对应名为mr-IdMap-IdReduce的文件,放入中间暂存的KeyValue类型slice的intermediate中,并对放入结果进行排序

else if reply.XTask.TaskType == "reduce" {
    numMap := reply.NMap
    var intermediate []KeyValue
    for i := 0; i < numMap; i++ {
        //file顺序也是numMap-IdReduce
        mapFileName := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.XTask.TaskIndex)
        inputFile, err := os.OpenFile(mapFileName, os.O_RDONLY, 0777)
        if err != nil {
            log.Fatalf("can't open reduceFile %v", mapFileName)
        }
        dec := json.NewDecoder(inputFile)
        for {
            var kv []KeyValue
            if err := dec.Decode(&kv); err != nil {
                break
            }
            intermediate = append(intermediate, kv...)
        }
    }
    sort.Sort(ByKey(intermediate))

将每一部分相同单词的key,value对分离出来并输出结果到mr-reduce-* 的临时文件,全部输出完成后再改名为mr-out-*:

outFileName := "mr-out-" + strconv.Itoa(reply.XTask.TaskIndex)
tmpFile, error := ioutil.TempFile("", "mr-reduce-*")
if error != nil {
    log.Fatalf("can't open tempFile")
}
​
i := 0
for i < len(intermediate) {
    j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
        j++
    }
    values := []string{}
    for k := i; k < j; k++ {
        //将每一部分相同单词的key,value对分离出来
        values = append(values, intermediate[k].Value)
    }
    output := reducef(intermediate[i].Key, values)
​
    // this is the correct format for each line of Reduce output.
    fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
​
    i = j
}
tmpFile.Close()
os.Rename(tmpFile.Name(), outFileName)

将arg的askingType改为finish,再call发送rpc请求:

//调用RPC传达完成task
            args := AskingArgs{}
            args.WorkId = workID
            args.AskingType = "finish"
            args.TaskType = reply.XTask.TaskType
            args.TaskIndex = taskIndex
            reply := Reply{}
            //发送finish请求
            Call(workID, &args, &reply)

修改Call函数,使每次调用Call都rpc联系到Coordinator的GetTask函数

func Call(workId int, args *AskingArgs, reply *Reply) {
​
    // fill in the argument(s).
    args.WorkId = workId
​
    ok := call("Coordinator.GetTask", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")
    }
}

coordinator中的GetTask方法

整体思路:

先上锁并defer解锁,初始化值

mu.Lock()
defer mu.Unlock()
reply.NReduce = c.NReduce
reply.NMap = c.NMap

args的askingType为finish,从RunningTask中移除

//如果是完成任务,从RunningTask中移除
if args.AskingType == "finish" {
    for i, task := range c.RunningTasks {
        if task.TaskType == args.TaskType && task.TaskIndex == args.TaskIndex {
            copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
            c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
            break
        }
    }
    return nil
}

还有map任务未分配:

//如果请求任务就分配
//先检查Map任务是否全部完成
//如果还有Map任务先把Map任务分配出去并将MapTask移到RunningTask
if len(c.MapTasks) > 0 {
    reply.XTask.TaskType = "map"
    thisTask := <-c.MapTasks
    reply.XTask.TaskIndex = thisTask.TaskIndex
    reply.XTask.StartTime = time.Now()
    reply.XTask.Filename = thisTask.Filename
​
    //如果还有Map任务先把Map任务分配出去并将MapTask移到RunningTask
    thisTask.StartTime = time.Now()
    thisTask.WorkId = args.WorkId
    c.RunningTasks = append(c.RunningTasks, thisTask)
​
    return nil
}

如果还有maptask正在运行,则不能启动reduce task,返回taskType为none

//如果还有maptask正在运行,则不能启动reduce task
hasMap := false
for _, task := range c.RunningTasks {
    if task.TaskType == "map" {
        hasMap = true
        break
    }
}
if hasMap {
    reply.XTask.TaskType = "none"
    return nil
}

map已全部完成,如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask

//检查Reduce任务是否完成
//如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask
if len(c.ReduceTasks) > 0 {
    reply.XTask.TaskType = "reduce"
    thisTask := <-c.ReduceTasks
    reply.XTask.TaskIndex = thisTask.TaskIndex
    reply.XTask.StartTime = time.Now()
​
    //如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask
    thisTask.StartTime = time.Now()
    thisTask.WorkId = args.WorkId
    c.RunningTasks = append(c.RunningTasks, thisTask)
​
    return nil
}
​

如果只有运行中的任务就让worker待定,并返回taskType为none

//如果只有运行中的任务就让worker待定
if len(c.MapTasks) > 0 || len(c.ReduceTasks) > 0 || len(c.RunningTasks) > 0 {
    reply.XTask.TaskType = "none"
    return nil
}

如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除

//如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除
reply.XTask.TaskType = "exit"
​
return nil

完成任务Done()

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
    ret := false
​
    // Your code here.
    //if job had finished, ret = true
    if len(c.RunningTasks) == 0 &&
        len(c.MapTasks) == 0 &&
        len(c.ReduceTasks) == 0 &&
        len(c.WorkerStatuses) == 0 {
        log.Println("所有任务处理完毕")
        ret = true
    }
    return ret
}

最终结果(全部代码)

第一种:只判断任务超过10s,不发心跳(应该也只能这么办)

coordinator.go

package mr
​
import "C"
import (
    "log"
    "net"
    "net/http"
    "net/rpc"
    "os"
    "sync"
    "time"
)
​
type Coordinator struct {
    // Your definitions here.
    Filenames []string
​
    NReduce int
    NMap    int
​
    MapTasks    chan Task
    ReduceTasks chan Task
​
    RunningTasks []Task
}
​
type Task struct {
    //map/reduce/none/exit
    TaskType  string
    Filename  string
    TaskIndex int
    //在running task的startTime,如果超过10s则移除
    StartTime time.Time
    WorkId    int
}
​
var mu sync.Mutex
​
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *AskingArgs, reply *Reply) error {
    mu.Lock()
    defer mu.Unlock()
    reply.NReduce = c.NReduce
    reply.NMap = c.NMap
​
    //如果是完成任务,从RunningTask中移除
    if args.AskingType == "finish" {
        for i, task := range c.RunningTasks {
            if task.TaskType == args.TaskType && task.TaskIndex == args.TaskIndex {
                copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
                break
            }
        }
        return nil
    }
​
    //如果请求任务就分配
    //先检查Map任务是否全部完成
    //如果还有Map任务先把Map任务分配出去并将MapTask移到RunningTask
    if len(c.MapTasks) > 0 {
        reply.XTask.TaskType = "map"
        thisTask := <-c.MapTasks
        reply.XTask.TaskIndex = thisTask.TaskIndex
        reply.XTask.StartTime = time.Now()
        reply.XTask.Filename = thisTask.Filename
​
        //如果还有Map任务先把Map任务分配出去并将MapTask移到RunningTask
        thisTask.StartTime = time.Now()
        thisTask.WorkId = args.WorkId
        c.RunningTasks = append(c.RunningTasks, thisTask)
​
        return nil
    }
​
    //如果还有maptask正在运行,则不能启动reduce task
    hasMap := false
    for _, task := range c.RunningTasks {
        if task.TaskType == "map" {
            hasMap = true
            break
        }
    }
    if hasMap {
        reply.XTask.TaskType = "none"
        return nil
    }
​
    //检查Reduce任务是否完成
    //如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask
    if len(c.ReduceTasks) > 0 {
        reply.XTask.TaskType = "reduce"
        thisTask := <-c.ReduceTasks
        reply.XTask.TaskIndex = thisTask.TaskIndex
        reply.XTask.StartTime = time.Now()
​
        //如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask
        thisTask.StartTime = time.Now()
        thisTask.WorkId = args.WorkId
        c.RunningTasks = append(c.RunningTasks, thisTask)
​
        return nil
    }
​
    //如果只有运行中的任务就让worker待定
    if len(c.MapTasks) > 0 || len(c.ReduceTasks) > 0 || len(c.RunningTasks) > 0 {
        reply.XTask.TaskType = "none"
        return nil
    }
​
    //如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除
    reply.XTask.TaskType = "exit"
​
    return nil
}
​
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
    rpc.Register(c)
    rpc.HandleHTTP()
    //l, e := net.Listen("tcp", ":1234")
    sockname := coordinatorSock()
    os.Remove(sockname)
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go http.Serve(l, nil)
}
​
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
    ret := false
​
    // Your code here.
    //if job had finished, ret = true
    if len(c.RunningTasks) == 0 &&
        len(c.MapTasks) == 0 &&
        len(c.ReduceTasks) == 0 {
        log.Println("所有任务处理完毕")
        ret = true
    }
    return ret
}
​
// create a Coordinator.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        MapTasks:    make(chan Task, len(files)),
        ReduceTasks: make(chan Task, nReduce),
        NReduce:     nReduce,
        NMap:        len(files),
        Filenames:   files,
    }
​
    //添加map任务至channel
    for i, file := range files {
        task := Task{TaskType: "map", Filename: file, TaskIndex: i}
        c.MapTasks <- task
    }
​
    //添加reduce任务至channel
    for i := 0; i < nReduce; i++ {
        task := Task{TaskType: "reduce", TaskIndex: i}
        c.ReduceTasks <- task
    }
​
    c.server()
​
    //开启一个线程每秒检查有没有超时任务,超时了就代表worker失效了
    //但这种假设是存在于任务比较小的情况下
    go func() {
        for {
            time.Sleep(time.Second / 10)
            for i, task := range c.RunningTasks {
                //任务如果超时则放回channel
                if time.Now().Sub(task.StartTime).Seconds() >= 10 {
                    if task.TaskType == "map" {
                        c.MapTasks <- task
                    } else if task.TaskType == "reduce" {
                        c.ReduceTasks <- task
                    }
                    //把超时任务从RunningTask中移除
                    copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                    c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
                    break
                }
            }
        }
    }()
    return &c
}
​

rpc.go

package mr
​
//
// RPC definitions.
//
// remember to capitalize all names.
//
​
import (
    "os"
)
import "strconv"
​
// Add your RPC definitions here.
​
type AskingArgs struct {
    AskingType string    //ask finish
    WorkId int           //unique id
    TaskType  string     //map reduce
    TaskIndex int
}
​
type Reply struct {
    XTask   Task
    NReduce int
    NMap    int
}
​
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
    s := "/var/tmp/824-mr-"
    s += strconv.Itoa(os.Getuid())
    return s
}
​

worker.go

package mr
​
import (
    "crypto/rand"
    "encoding/json"
    "fmt"
    "hash/fnv"
    "io/ioutil"
    "log"
    "math/big"
    "net/rpc"
    "os"
    "sort"
    "strconv"
    "time"
)
​
// KeyValue
// Map functions return a slice of KeyValue.
type KeyValue struct {
    Key   string
    Value string
}
​
// ByKey for sorting by key.
type ByKey []KeyValue
​
// Len for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
​
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    return int(h.Sum32() & 0x7fffffff)
}
​
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    // Your worker implementation here.
    workID, _ := rand.Int(rand.Reader, big.NewInt(100000000))
​
    // uncomment to send the Example RPC to the coordinator.
    // CallExample()
    taskType := "none"
    for taskType != "exit" {
        //每秒调用一次RPC
        time.Sleep(time.Second)
        args := AskingArgs{}
        args.WorkId = int(workID.Int64())
        args.AskingType = "asking"
​
        workID := int(workID.Int64())
        reply := Reply{}
​
        //rpc发送任务请求
        Call(args.WorkId, &args, &reply)
        taskType = reply.XTask.TaskType
        taskIndex := reply.XTask.TaskIndex
​
        //如果得到的task是map
        if reply.XTask.TaskType == "map" {
            fileName := reply.XTask.Filename
            //读取文件
            file, err := os.Open(fileName)
            if err != nil {
                log.Fatalf("cannot open %v", fileName)
            }
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", fileName)
            }
            file.Close()
​
            //把文件内容根据key放入buf桶中
            kva := mapf(fileName, string(content))
            //初始化buf桶
            var buf [][]KeyValue
            for i := 0; i < reply.NReduce; i++ {
                buf = append(buf, []KeyValue{})
            }
            for _, value := range kva {
                //通过hash取模,将内容输出到NRdeuce个文件中
                hashKey := ihash(value.Key)
                num := hashKey % reply.NReduce
                buf[num] = append(buf[num], value)
            }
            //对输出结果进行排序
            for i := 0; i < reply.NReduce; i++ {
                sort.Sort(ByKey(buf[i]))
            }
            //输出到文件
            for i := 0; i < reply.NReduce; i++ {
                tmpFile, error := ioutil.TempFile("", "mr-map-*")
                if error != nil {
                    log.Fatalf("can't open tempFile")
                }
                enc := json.NewEncoder(tmpFile)
                err := enc.Encode(buf[i])
                if err != nil {
                    log.Fatalf("fail to encode")
                }
                tmpFile.Close()
                //fileName的顺序是IdMap-ReduceNum
                outFileName := "mr-" + strconv.Itoa(reply.XTask.TaskIndex) + "-" + strconv.Itoa(i)
​
                //确认完成后再改名
                os.Rename(tmpFile.Name(), outFileName)
            }
​
        } else if reply.XTask.TaskType == "reduce" {
            numMap := reply.NMap
            var intermediate []KeyValue
            for i := 0; i < numMap; i++ {
                //file顺序也是numMap-IdReduce
                mapFileName := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.XTask.TaskIndex)
                inputFile, err := os.OpenFile(mapFileName, os.O_RDONLY, 0777)
                if err != nil {
                    log.Fatalf("can't open reduceFile %v", mapFileName)
                }
                dec := json.NewDecoder(inputFile)
                for {
                    var kv []KeyValue
                    if err := dec.Decode(&kv); err != nil {
                        break
                    }
                    intermediate = append(intermediate, kv...)
                }
            }
            sort.Sort(ByKey(intermediate))
​
            outFileName := "mr-out-" + strconv.Itoa(reply.XTask.TaskIndex)
            tmpFile, error := ioutil.TempFile("", "mr-reduce-*")
            if error != nil {
                log.Fatalf("can't open tempFile")
            }
​
            i := 0
            for i < len(intermediate) {
                j := i + 1
                for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
                    j++
                }
                values := []string{}
                for k := i; k < j; k++ {
                    //将每一部分相同单词的key,value对分离出来
                    values = append(values, intermediate[k].Value)
                }
                output := reducef(intermediate[i].Key, values)
​
                // this is the correct format for each line of Reduce output.
                fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
​
                i = j
            }
            tmpFile.Close()
            os.Rename(tmpFile.Name(), outFileName)
        }
        //调用RPC传达完成task
        args = AskingArgs{}
        args.WorkId = workID
        args.AskingType = "finish"
        args.TaskType = reply.XTask.TaskType
        args.TaskIndex = taskIndex
        reply = Reply{}
        //发送finish请求
        Call(workID, &args, &reply)
    }
}
func Call(workId int, args *AskingArgs, reply *Reply) {
​
    // fill in the argument(s).
    args.WorkId = workId
​
    ok := call("Coordinator.GetTask", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")
    }
}
​
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
    sockname := coordinatorSock()
    //log.Println("sockname is " + sockname)
    c, err := rpc.DialHTTP("unix", sockname)
    if err != nil {
        log.Fatal("dialing:", err)
    }
    defer c.Close()
​
    err = c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }
​
    fmt.Println(err)
    return false
}
​

第二种:加了奇怪的心跳(有问题的)

前面已经pass all tests了,想到心跳的原因是考虑到本实验只是一个小任务,如果任务量大,绝对是不能只用task规定时间未完成这一个指标就判断crash的,如果worker挂得太早了,将需要过很长时间才能发现它挂掉。于是想到了心跳

在论文中是说让master周期性地ping每个worker,但本实验提供的rpc应该无法这样实现。

这里的思路是让worker每过一段时间rpc调用一下CallHeartBeat方法,更新一下Coordinator那边WorkStatus最新心跳时间,如果心跳时间超过10s未更新,该worker就被判定为死亡。本来应该使用go程,但是此处好像难以停止发送心跳请求的go程,有些难以实时获取TaskType的状态。(HeartBeat方法只能做到最新更新时间,而如果在MakeCoordinator()中把失效worker删除,MakeCoordinator()难以将最新状态同步给CallHeartBeat的go程来结束go程)

先在此存个档

coordinator

性能会低一些

package mr
​
import "C"
import (
    "log"
    "net"
    "net/http"
    "net/rpc"
    "os"
    "sync"
    "time"
)
​
​
type Coordinator struct {
    // Your definitions here.
    Filenames []string
​
    NReduce int
    NMap    int
​
    MapTasks    chan Task
    ReduceTasks chan Task
​
    RunningTasks []Task
    //worker的状态
    WorkerStatuses []WorkerStatus
}
​
type Task struct {
    //map/reduce/none/exit
    TaskType  string
    Filename  string
    TaskIndex int
    //在running task的startTime,如果超过10s则移除
    StartTime time.Time
    WorkId    int
}
​
type WorkerStatus struct {
    //心跳时间
    //如果超过10s则移除
    LastHearten time.Time
    WorkerId    int
}
​
var mu sync.Mutex
​
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *AskingArgs, reply *Reply) error {
    mu.Lock()
    defer mu.Unlock()
    reply.NReduce = c.NReduce
    reply.NMap = c.NMap
​
    //log.Println("worker" + strconv.Itoa(args.WorkId) + "发送心跳,请求类型为" + args.AskingType)
    //如果是完成任务,从RunningTask中移除
    if args.AskingType == "finish" {
        for i, task := range c.RunningTasks {
            if task.TaskType == args.TaskType && task.TaskIndex == args.TaskIndex {
                copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
                break
            }
        }
        return nil
    }
​
    //如果请求任务就分配
    //先检查Map任务是否全部完成
    //如果还有Map任务先把Map任务分配出去并将MapTask移到RunningTask
    if len(c.MapTasks) > 0 {
        reply.XTask.TaskType = "map"
        thisTask := <-c.MapTasks
        reply.XTask.TaskIndex = thisTask.TaskIndex
        reply.XTask.StartTime = time.Now()
        reply.XTask.Filename = thisTask.Filename
​
        //如果还有Map任务先把Map任务分配出去并将MapTask移到RunningTask
        thisTask.StartTime = time.Now()
        thisTask.WorkId = args.WorkId
        c.RunningTasks = append(c.RunningTasks, thisTask)
​
        return nil
    }
​
    //如果还有maptask正在运行,则不能启动reduce task
    hasMap := false
    for _, task := range c.RunningTasks {
        if task.TaskType == "map" {
            hasMap = true
            break
        }
    }
    if hasMap {
        reply.XTask.TaskType = "none"
        return nil
    }
​
    //检查Reduce任务是否完成
    //如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask
    if len(c.ReduceTasks) > 0 {
        reply.XTask.TaskType = "reduce"
        thisTask := <-c.ReduceTasks
        reply.XTask.TaskIndex = thisTask.TaskIndex
        reply.XTask.StartTime = time.Now()
​
        //如果还有Reduce任务先把Reduce任务分配出去并将ReduceTask移到RunningTask
        thisTask.StartTime = time.Now()
        thisTask.WorkId = args.WorkId
        c.RunningTasks = append(c.RunningTasks, thisTask)
​
        return nil
    }
​
    //如果只有运行中的任务就让worker待定
    if len(c.MapTasks) > 0 || len(c.ReduceTasks) > 0 || len(c.RunningTasks) > 0 {
        reply.XTask.TaskType = "none"
        return nil
    }
​
    //如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除
    reply.XTask.TaskType = "exit"
    for i, worker := range c.WorkerStatuses {
        if worker.WorkerId == args.WorkId {
            copy(c.WorkerStatuses[i:], c.WorkerStatuses[i+1:])
            c.WorkerStatuses = c.WorkerStatuses[:len(c.WorkerStatuses)-1]
        }
    }
    return nil
}
​
func (c *Coordinator) HeartBeat(args *AskingArgs, reply *Reply) error {
    mu.Lock()
    //更新Worker中的心跳时间
    hasWorker := false
    for _, worker := range c.WorkerStatuses {
        if worker.WorkerId == args.WorkId {
            worker.LastHearten = time.Now()
            hasWorker = true
            break
        }
    }
​
    //之前没有的新worker
    if !hasWorker {
        worker := WorkerStatus{}
        worker.WorkerId = args.WorkId
        worker.LastHearten = time.Now()
        c.WorkerStatuses = append(c.WorkerStatuses, worker)
    }
​
    mu.Unlock()
    return nil
}
​
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
    rpc.Register(c)
    rpc.HandleHTTP()
    //l, e := net.Listen("tcp", ":1234")
    sockname := coordinatorSock()
    os.Remove(sockname)
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go http.Serve(l, nil)
}
​
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
    ret := false
​
    // Your code here.
    //if job had finished, ret = true
    if len(c.RunningTasks) == 0 &&
        len(c.MapTasks) == 0 &&
        len(c.ReduceTasks) == 0 &&
        len(c.WorkerStatuses) == 0 {
        log.Println("所有任务处理完毕")
        ret = true
    }
    return ret
}
​
// create a Coordinator.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        MapTasks:    make(chan Task, len(files)),
        ReduceTasks: make(chan Task, nReduce),
        NReduce:     nReduce,
        NMap:        len(files),
        Filenames:   files,
    }
​
    //添加map任务至channel
    for i, file := range files {
        task := Task{TaskType: "map", Filename: file, TaskIndex: i}
        c.MapTasks <- task
    }
​
    //添加reduce任务至channel
    for i := 0; i < nReduce; i++ {
        task := Task{TaskType: "reduce", TaskIndex: i}
        c.ReduceTasks <- task
    }
​
    c.server()
​
    //开启一个线程每秒检查有没有超时任务
    //检查有没有失效worker
    go func() {
        for {
            time.Sleep(time.Second / 10)
            for i, task := range c.RunningTasks {
                //任务如果超时则放回channel
                if time.Now().Sub(task.StartTime).Seconds() >= 10 {
                    if task.TaskType == "map" {
                        c.MapTasks <- task
                    } else if task.TaskType == "reduce" {
                        c.ReduceTasks <- task
                    }
                    //把超时任务从RunningTask中移除
                    copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                    c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
                    break
                }
            }
​
            for i, status := range c.WorkerStatuses {
                if time.Now().Sub(status.LastHearten).Seconds() >= 10 {
                    //将crash worker直接删去
                    copy(c.WorkerStatuses[i:], c.WorkerStatuses[i+1:])
                    c.WorkerStatuses = c.WorkerStatuses[:len(c.WorkerStatuses)-1]
                    break
                }
            }
        }
    }()
    return &c
}
​

rpc

package mr
​
//
// RPC definitions.
//
// remember to capitalize all names.
//
​
import (
    "os"
)
import "strconv"
​
// Add your RPC definitions here.
​
type AskingArgs struct {
    AskingType string    //ask finish
    WorkId int           //unique id
    TaskType  string     //map reduce
    TaskIndex int
}
​
type Reply struct {
    XTask   Task
    NReduce int
    NMap    int
}
​
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
    s := "/var/tmp/824-mr-"
    s += strconv.Itoa(os.Getuid())
    return s
}
​

worker

package mr
​
import (
    "crypto/rand"
    "encoding/json"
    "fmt"
    "hash/fnv"
    "io/ioutil"
    "log"
    "math/big"
    "net/rpc"
    "os"
    "sort"
    "strconv"
    "time"
)
​
// KeyValue
// Map functions return a slice of KeyValue.
type KeyValue struct {
    Key   string
    Value string
}
​
// ByKey for sorting by key.
type ByKey []KeyValue
​
// Len for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
​
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    return int(h.Sum32() & 0x7fffffff)
}
​
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    // Your worker implementation here.
    workID, _ := rand.Int(rand.Reader, big.NewInt(100000000))
​
    // uncomment to send the Example RPC to the coordinator.
    // CallExample()
    taskType := "none"
    for taskType != "exit" {
        //每秒调用一次RPC
        time.Sleep(time.Second)
        args := AskingArgs{}
        args.WorkId = int(workID.Int64())
        args.AskingType = "asking"
​
        workID := int(workID.Int64())
        reply := Reply{}
​
        //发送任务请求
        CallTask(args.WorkId, &args, &reply)
        taskType = reply.XTask.TaskType
        taskIndex := reply.XTask.TaskIndex
​
        if reply.XTask.TaskType == "map" {
            fileName := reply.XTask.Filename
            //读取文件
            file, err := os.Open(fileName)
            if err != nil {
                log.Fatalf("cannot open %v", fileName)
            }
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", fileName)
            }
            file.Close()
​
            //把文件内容根据key放入buf桶中
            kva := mapf(fileName, string(content))
            //初始化buf桶
            var buf [][]KeyValue
            for i := 0; i < reply.NReduce; i++ {
                buf = append(buf, []KeyValue{})
            }
            for _, value := range kva {
                hashKey := ihash(value.Key)
                num := hashKey % reply.NReduce
                buf[num] = append(buf[num], value)
            }
            for i := 0; i < reply.NReduce; i++ {
                sort.Sort(ByKey(buf[i]))
            }
​
            CallHeartBeat(workID, &args, &reply)
​
            //输出到文件
            for i := 0; i < reply.NReduce; i++ {
                tmpFile, error := ioutil.TempFile("", "mr-map-*")
                if error != nil {
                    log.Fatalf("can't open tempFile")
                }
                enc := json.NewEncoder(tmpFile)
                err := enc.Encode(buf[i])
                if err != nil {
                    log.Fatalf("fail to encode")
                }
                tmpFile.Close()
                //fileName的顺序是IdMap-ReduceNum
                outFileName := "mr-" + strconv.Itoa(reply.XTask.TaskIndex) + "-" + strconv.Itoa(i)
​
                //确认完成后再改名
                os.Rename(tmpFile.Name(), outFileName)
            }
​
        } else if reply.XTask.TaskType == "reduce" {
            numMap := reply.NMap
            var intermediate []KeyValue
            for i := 0; i < numMap; i++ {
                //file顺序也是numMap-IdReduce
                mapFileName := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.XTask.TaskIndex)
                inputFile, err := os.OpenFile(mapFileName, os.O_RDONLY, 0777)
                if err != nil {
                    log.Fatalf("can't open reduceFile %v", mapFileName)
                }
                dec := json.NewDecoder(inputFile)
                for {
                    var kv []KeyValue
                    if err := dec.Decode(&kv); err != nil {
                        break
                    }
                    intermediate = append(intermediate, kv...)
                }
            }
            sort.Sort(ByKey(intermediate))
​
            CallHeartBeat(workID, &args, &reply)
​
            outFileName := "mr-out-" + strconv.Itoa(reply.XTask.TaskIndex)
            tmpFile, error := ioutil.TempFile("", "mr-reduce-*")
            if error != nil {
                log.Fatalf("can't open tempFile")
            }
​
            i := 0
            for i < len(intermediate) {
                j := i + 1
                for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
                    j++
                }
                values := []string{}
                for k := i; k < j; k++ {
                    //将每一部分相同单词的key,value对分离出来
                    values = append(values, intermediate[k].Value)
                }
                output := reducef(intermediate[i].Key, values)
​
                // this is the correct format for each line of Reduce output.
                fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
​
                i = j
            }
            tmpFile.Close()
            os.Rename(tmpFile.Name(), outFileName)
        }
        //调用RPC传达完成task
        args := AskingArgs{}
        args.WorkId = workID
        args.AskingType = "finish"
        args.TaskType = reply.XTask.TaskType
        args.TaskIndex = taskIndex
        reply := Reply{}
        //发送finish请求
        CallTask(workID, &args, &reply)
    }
}
func CallTask(workId int, args *AskingArgs, reply *Reply) {
​
    // fill in the argument(s).
    args.WorkId = workId
​
    ok := call("Coordinator.GetTask", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")
    }
}
​
func CallHeartBeat(workId int, args *AskingArgs, reply *Reply) {
​
    // fill in the argument(s).
    args.WorkId = workId
​
    ok := call("Coordinator.HeartBeat", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")
    }
}
​
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
    sockname := coordinatorSock()
    //log.Println("sockname is " + sockname)
    c, err := rpc.DialHTTP("unix", sockname)
    if err != nil {
        log.Fatal("dialing:", err)
    }
    defer c.Close()
​
    err = c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }
​
    fmt.Println(err)
    return false
}
​


0%