6.824 Lab1 : MapReduce

Map函数:接受一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。






1.用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB







pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value


4.缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的

key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给

Reduce worker。(master存储了Map任务产生的R个中间文件存储区域的大小和位置,Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务)

5.当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主

机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后(完成排序以后的合并文件就是reduce任务的输入文件),通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进


6.Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这






  1. worker:master周期性的ping每个worker

    失效map worker的全部任务都会换个map worker重置再来(因此时的map任务失效,其结果的位置信息不会在被回传给master,对应的磁盘存储虽然被占用,但不会被访问)。一个没读过这个失效worker的新reduce worker会收到通知开始读。

    失效reduce worker已完成的任务不用再重复完成,换一个worker将剩下的完成

  2. master失效的情况未做细致说明



需要在mr package里的写代码

  • coordinator.go:相当于论文中的master

    • 设计Coordinator和Task的结构体

    • MakeCoordinator():初始化创建一个coordinator的函数

    • -- RPC handlers for the worker to call.本实验中取名为GetTask()函数

    • Done():最终完成后的函数

  • rpc.go

    • 设计rpc请求args和响应reply的结构体

  • worker.go

    • Worker():需要完成的具体任务编写,比如请求task......




$ go build -buildmode=plugin ../mrapps/wc.go


sh test-mr.sh


  • The map phase should divide the intermediate keys into buckets for nReduce reduce tasks, where nReduce is the argument that main/mrmaster.go passes to MakeMaster().


  • The worker implementation should put the output of the X'th reduce task in the file mr-out-X.


  • A mr-out-X file should contain one line per Reduce function output. The line should be generated with the Go "%v %v" format, called with the key and value. Have a look in main/mrsequential.go for the line commented "this is the correct format". The test script will fail if your implementation deviates too much from this format.


  • You can modify mr/worker.go, mr/master.go, and mr/rpc.go. You can temporarily modify other files for testing, but make sure your code works with the original versions; we'll test with the original versions.

    只能修改mr文件下下的三个文件mr/worker.go, mr/master.go, and mr/rpc.go

  • The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.


  • main/mrmaster.go expects mr/master.go to implement a Done() method that returns true when the MapReduce job is completely finished; at that point, mrmaster.go will exit.


  • When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from call(): if the worker fails to contact the master, it can assume that the master has exited because the job is done, and so the worker can terminate too. Depending on your design, you might also find it helpful to have a "please exit" pseudo-task that the master can give to workers.



  • One way to get started is to modify mr/worker.go's Worker() to send an RPC to the master asking for a task. Then modify the master to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

    通过worker.go中的Worker()函数发送 想得到一个task的RPC请求给master,master返回respond一个没有开始的map task,然后编写worker开始读写,可以用mrsequential.go中的map function

  • This lab relies on the workers sharing a file system. That's straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.


  • A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

    map结束后,准备给reduce使用时保存的临时文件可以mr-X-Y命名,X是map task的编号,Y是reduce task的编号

  • The worker's map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go's encoding/json package. To write key/value pairs to a JSON file:



      enc := json.NewEncoder(file)
      for _, kv := ... {
        err := enc.Encode(&kv)


      dec := json.NewDecoder(file)
      for {
        var kv KeyValue
        if err := dec.Decode(&kv); err != nil {
        kva = append(kva, kv)
  • The map part of your worker can use the ihash(key) function (in worker.go) to pick the reduce task for a given key.

    map part中可以用ihash(key)来决定放给哪个reduce task来解决

  • You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.

    可以使用mrsequential.go中的代码来读map files,并且进行Map和Reduce间的sorting和Reduce output files的sorting

  • The master, as an RPC server, will be concurrent; don't forget to lock shared data.

    master和RPC server是同时发生的,不要忘记锁住你的数据

  • Use Go's race detector, with go build -race and go run -race. test-mr.sh has a comment that shows you how to enable the race detector for the tests.


  • Workers will sometimes need to wait, e.g. reduces can't start until the last map has finished. One possibility is for workers to periodically ask the master for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the master to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won't prevent the master from processing other RPCs.


  • The master can't reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the master wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the master wait for ten seconds; after that the master should assume the worker has died (of course, it might not have).


  • To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.


  • test-mr.sh runs all the processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there.






type Coordinator struct {
    // Your definitions here.
    Filenames []string
    NReduce int
    NMap    int
    MapTasks    chan Task
    ReduceTasks chan Task
    RunningTasks []Task
type Task struct {
    TaskType  string
    Filename  string
    TaskIndex int
    //在running task的startTime,如果超过10s则移除
    StartTime time.Time
    WorkId    int

还有 保证每次只有一个 Go 程能够访问一个共享的变量的互斥锁

var mu sync.Mutex


type AskingArgs struct {
    AskingType string    //ask finish
    WorkId int           //unique id
    TaskType  string     //map reduce
    TaskIndex int
type Reply struct {
    XTask   Task
    NReduce int
    NMap    int




// create a Coordinator.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        MapTasks:    make(chan Task, len(files)),
        ReduceTasks: make(chan Task, nReduce),
        NReduce:     nReduce,
        NMap:        len(files),
        Filenames:   files,
    for i, file := range files {
        task := Task{TaskType: "map", Filename: file, TaskIndex: i}
        c.MapTasks <- task
    for i := 0; i < nReduce; i++ {
        task := Task{TaskType: "reduce", TaskIndex: i}
        c.ReduceTasks <- task
    return &c


    go func() {
        for {
            time.Sleep(time.Second / 10)
            for i, task := range c.RunningTasks {
                if time.Now().Sub(task.StartTime).Seconds() >= 10 {
                    if task.TaskType == "map" {
                        c.MapTasks <- task
                    } else if task.TaskType == "reduce" {
                        c.ReduceTasks <- task
                    copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                    c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]




workID, _ := rand.Int(rand.Reader, big.NewInt(100000000))



taskType := "none"
for taskType != "exit" {


args := AskingArgs{}
args.WorkId = int(workID.Int64())
args.AskingType = "asking"
workID := int(workID.Int64())
reply := Reply{}
Call(args.WorkId, &args, &reply)
taskType = reply.XTask.TaskType
taskIndex := reply.XTask.TaskIndex


if reply.XTask.TaskType == "map" {
    fileName := reply.XTask.Filename
    file, err := os.Open(fileName)
    if err != nil {
        log.Fatalf("cannot open %v", fileName)
    content, err := ioutil.ReadAll(file)
    if err != nil {
        log.Fatalf("cannot read %v", fileName)
    kva := mapf(fileName, string(content))
    var buf [][]KeyValue
    for i := 0; i < reply.NReduce; i++ {
        buf = append(buf, []KeyValue{})
    for _, value := range kva {
        hashKey := ihash(value.Key)
        num := hashKey % reply.NReduce
        buf[num] = append(buf[num], value)


for i := 0; i < reply.NReduce; i++ {


for i := 0; i < reply.NReduce; i++ {
    tmpFile, error := ioutil.TempFile("", "mr-map-*")
    if error != nil {
        log.Fatalf("can't open tempFile")
    enc := json.NewEncoder(tmpFile)
    err := enc.Encode(buf[i])
    if err != nil {
        log.Fatalf("fail to encode")
    outFileName := "mr-" + strconv.Itoa(reply.XTask.TaskIndex) + "-" + strconv.Itoa(i)
    os.Rename(tmpFile.Name(), outFileName)


else if reply.XTask.TaskType == "reduce" {
    numMap := reply.NMap
    var intermediate []KeyValue
    for i := 0; i < numMap; i++ {
        mapFileName := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.XTask.TaskIndex)
        inputFile, err := os.OpenFile(mapFileName, os.O_RDONLY, 0777)
        if err != nil {
            log.Fatalf("can't open reduceFile %v", mapFileName)
        dec := json.NewDecoder(inputFile)
        for {
            var kv []KeyValue
            if err := dec.Decode(&kv); err != nil {
            intermediate = append(intermediate, kv...)

将每一部分相同单词的key,value对分离出来并输出结果到mr-reduce-* 的临时文件,全部输出完成后再改名为mr-out-*:

outFileName := "mr-out-" + strconv.Itoa(reply.XTask.TaskIndex)
tmpFile, error := ioutil.TempFile("", "mr-reduce-*")
if error != nil {
    log.Fatalf("can't open tempFile")
i := 0
for i < len(intermediate) {
    j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
    values := []string{}
    for k := i; k < j; k++ {
        values = append(values, intermediate[k].Value)
    output := reducef(intermediate[i].Key, values)
    // this is the correct format for each line of Reduce output.
    fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
    i = j
os.Rename(tmpFile.Name(), outFileName)


            args := AskingArgs{}
            args.WorkId = workID
            args.AskingType = "finish"
            args.TaskType = reply.XTask.TaskType
            args.TaskIndex = taskIndex
            reply := Reply{}
            Call(workID, &args, &reply)


func Call(workId int, args *AskingArgs, reply *Reply) {
    // fill in the argument(s).
    args.WorkId = workId
    ok := call("Coordinator.GetTask", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")




defer mu.Unlock()
reply.NReduce = c.NReduce
reply.NMap = c.NMap


if args.AskingType == "finish" {
    for i, task := range c.RunningTasks {
        if task.TaskType == args.TaskType && task.TaskIndex == args.TaskIndex {
            copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
            c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
    return nil


if len(c.MapTasks) > 0 {
    reply.XTask.TaskType = "map"
    thisTask := <-c.MapTasks
    reply.XTask.TaskIndex = thisTask.TaskIndex
    reply.XTask.StartTime = time.Now()
    reply.XTask.Filename = thisTask.Filename
    thisTask.StartTime = time.Now()
    thisTask.WorkId = args.WorkId
    c.RunningTasks = append(c.RunningTasks, thisTask)
    return nil

如果还有maptask正在运行,则不能启动reduce task,返回taskType为none

//如果还有maptask正在运行,则不能启动reduce task
hasMap := false
for _, task := range c.RunningTasks {
    if task.TaskType == "map" {
        hasMap = true
if hasMap {
    reply.XTask.TaskType = "none"
    return nil


if len(c.ReduceTasks) > 0 {
    reply.XTask.TaskType = "reduce"
    thisTask := <-c.ReduceTasks
    reply.XTask.TaskIndex = thisTask.TaskIndex
    reply.XTask.StartTime = time.Now()
    thisTask.StartTime = time.Now()
    thisTask.WorkId = args.WorkId
    c.RunningTasks = append(c.RunningTasks, thisTask)
    return nil


if len(c.MapTasks) > 0 || len(c.ReduceTasks) > 0 || len(c.RunningTasks) > 0 {
    reply.XTask.TaskType = "none"
    return nil

如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除

//如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除
reply.XTask.TaskType = "exit"
return nil


// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
    ret := false
    // Your code here.
    //if job had finished, ret = true
    if len(c.RunningTasks) == 0 &&
        len(c.MapTasks) == 0 &&
        len(c.ReduceTasks) == 0 &&
        len(c.WorkerStatuses) == 0 {
        ret = true
    return ret




前面已经pass all tests了,想到心跳的原因是考虑到本实验只是一个小任务,如果任务量大,绝对是不能只用task规定时间未完成这一个指标就判断crash的,如果worker挂得太早了,将需要过很长时间才能发现它挂掉。于是想到了心跳






package mr
import "C"
import (
type Coordinator struct {
    // Your definitions here.
    Filenames []string
    NReduce int
    NMap    int
    MapTasks    chan Task
    ReduceTasks chan Task
    RunningTasks []Task
    WorkerStatuses []WorkerStatus
type Task struct {
    TaskType  string
    Filename  string
    TaskIndex int
    //在running task的startTime,如果超过10s则移除
    StartTime time.Time
    WorkId    int
type WorkerStatus struct {
    LastHearten time.Time
    WorkerId    int
var mu sync.Mutex
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *AskingArgs, reply *Reply) error {
    defer mu.Unlock()
    reply.NReduce = c.NReduce
    reply.NMap = c.NMap
    //log.Println("worker" + strconv.Itoa(args.WorkId) + "发送心跳,请求类型为" + args.AskingType)
    if args.AskingType == "finish" {
        for i, task := range c.RunningTasks {
            if task.TaskType == args.TaskType && task.TaskIndex == args.TaskIndex {
                copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
        return nil
    if len(c.MapTasks) > 0 {
        reply.XTask.TaskType = "map"
        thisTask := <-c.MapTasks
        reply.XTask.TaskIndex = thisTask.TaskIndex
        reply.XTask.StartTime = time.Now()
        reply.XTask.Filename = thisTask.Filename
        thisTask.StartTime = time.Now()
        thisTask.WorkId = args.WorkId
        c.RunningTasks = append(c.RunningTasks, thisTask)
        return nil
    //如果还有maptask正在运行,则不能启动reduce task
    hasMap := false
    for _, task := range c.RunningTasks {
        if task.TaskType == "map" {
            hasMap = true
    if hasMap {
        reply.XTask.TaskType = "none"
        return nil
    if len(c.ReduceTasks) > 0 {
        reply.XTask.TaskType = "reduce"
        thisTask := <-c.ReduceTasks
        reply.XTask.TaskIndex = thisTask.TaskIndex
        reply.XTask.StartTime = time.Now()
        thisTask.StartTime = time.Now()
        thisTask.WorkId = args.WorkId
        c.RunningTasks = append(c.RunningTasks, thisTask)
        return nil
    if len(c.MapTasks) > 0 || len(c.ReduceTasks) > 0 || len(c.RunningTasks) > 0 {
        reply.XTask.TaskType = "none"
        return nil
    //如果Map与Reduce任务都完成了就返回please exit任务,并从running中移除
    reply.XTask.TaskType = "exit"
    for i, worker := range c.WorkerStatuses {
        if worker.WorkerId == args.WorkId {
            copy(c.WorkerStatuses[i:], c.WorkerStatuses[i+1:])
            c.WorkerStatuses = c.WorkerStatuses[:len(c.WorkerStatuses)-1]
    return nil
func (c *Coordinator) HeartBeat(args *AskingArgs, reply *Reply) error {
    hasWorker := false
    for _, worker := range c.WorkerStatuses {
        if worker.WorkerId == args.WorkId {
            worker.LastHearten = time.Now()
            hasWorker = true
    if !hasWorker {
        worker := WorkerStatus{}
        worker.WorkerId = args.WorkId
        worker.LastHearten = time.Now()
        c.WorkerStatuses = append(c.WorkerStatuses, worker)
    return nil
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
    //l, e := net.Listen("tcp", ":1234")
    sockname := coordinatorSock()
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    go http.Serve(l, nil)
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
    ret := false
    // Your code here.
    //if job had finished, ret = true
    if len(c.RunningTasks) == 0 &&
        len(c.MapTasks) == 0 &&
        len(c.ReduceTasks) == 0 &&
        len(c.WorkerStatuses) == 0 {
        ret = true
    return ret
// create a Coordinator.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        MapTasks:    make(chan Task, len(files)),
        ReduceTasks: make(chan Task, nReduce),
        NReduce:     nReduce,
        NMap:        len(files),
        Filenames:   files,
    for i, file := range files {
        task := Task{TaskType: "map", Filename: file, TaskIndex: i}
        c.MapTasks <- task
    for i := 0; i < nReduce; i++ {
        task := Task{TaskType: "reduce", TaskIndex: i}
        c.ReduceTasks <- task
    go func() {
        for {
            time.Sleep(time.Second / 10)
            for i, task := range c.RunningTasks {
                if time.Now().Sub(task.StartTime).Seconds() >= 10 {
                    if task.TaskType == "map" {
                        c.MapTasks <- task
                    } else if task.TaskType == "reduce" {
                        c.ReduceTasks <- task
                    copy(c.RunningTasks[i:], c.RunningTasks[i+1:])
                    c.RunningTasks = c.RunningTasks[:len(c.RunningTasks)-1]
            for i, status := range c.WorkerStatuses {
                if time.Now().Sub(status.LastHearten).Seconds() >= 10 {
                    //将crash worker直接删去
                    copy(c.WorkerStatuses[i:], c.WorkerStatuses[i+1:])
                    c.WorkerStatuses = c.WorkerStatuses[:len(c.WorkerStatuses)-1]
    return &c


package mr
// RPC definitions.
// remember to capitalize all names.
import (
import "strconv"
// Add your RPC definitions here.
type AskingArgs struct {
    AskingType string    //ask finish
    WorkId int           //unique id
    TaskType  string     //map reduce
    TaskIndex int
type Reply struct {
    XTask   Task
    NReduce int
    NMap    int
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
    s := "/var/tmp/824-mr-"
    s += strconv.Itoa(os.Getuid())
    return s


package mr
import (
// KeyValue
// Map functions return a slice of KeyValue.
type KeyValue struct {
    Key   string
    Value string
// ByKey for sorting by key.
type ByKey []KeyValue
// Len for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
    h := fnv.New32a()
    return int(h.Sum32() & 0x7fffffff)
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    // Your worker implementation here.
    workID, _ := rand.Int(rand.Reader, big.NewInt(100000000))
    // uncomment to send the Example RPC to the coordinator.
    // CallExample()
    taskType := "none"
    for taskType != "exit" {
        args := AskingArgs{}
        args.WorkId = int(workID.Int64())
        args.AskingType = "asking"
        workID := int(workID.Int64())
        reply := Reply{}
        CallTask(args.WorkId, &args, &reply)
        taskType = reply.XTask.TaskType
        taskIndex := reply.XTask.TaskIndex
        if reply.XTask.TaskType == "map" {
            fileName := reply.XTask.Filename
            file, err := os.Open(fileName)
            if err != nil {
                log.Fatalf("cannot open %v", fileName)
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", fileName)
            kva := mapf(fileName, string(content))
            var buf [][]KeyValue
            for i := 0; i < reply.NReduce; i++ {
                buf = append(buf, []KeyValue{})
            for _, value := range kva {
                hashKey := ihash(value.Key)
                num := hashKey % reply.NReduce
                buf[num] = append(buf[num], value)
            for i := 0; i < reply.NReduce; i++ {
            CallHeartBeat(workID, &args, &reply)
            for i := 0; i < reply.NReduce; i++ {
                tmpFile, error := ioutil.TempFile("", "mr-map-*")
                if error != nil {
                    log.Fatalf("can't open tempFile")
                enc := json.NewEncoder(tmpFile)
                err := enc.Encode(buf[i])
                if err != nil {
                    log.Fatalf("fail to encode")
                outFileName := "mr-" + strconv.Itoa(reply.XTask.TaskIndex) + "-" + strconv.Itoa(i)
                os.Rename(tmpFile.Name(), outFileName)
        } else if reply.XTask.TaskType == "reduce" {
            numMap := reply.NMap
            var intermediate []KeyValue
            for i := 0; i < numMap; i++ {
                mapFileName := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.XTask.TaskIndex)
                inputFile, err := os.OpenFile(mapFileName, os.O_RDONLY, 0777)
                if err != nil {
                    log.Fatalf("can't open reduceFile %v", mapFileName)
                dec := json.NewDecoder(inputFile)
                for {
                    var kv []KeyValue
                    if err := dec.Decode(&kv); err != nil {
                    intermediate = append(intermediate, kv...)
            CallHeartBeat(workID, &args, &reply)
            outFileName := "mr-out-" + strconv.Itoa(reply.XTask.TaskIndex)
            tmpFile, error := ioutil.TempFile("", "mr-reduce-*")
            if error != nil {
                log.Fatalf("can't open tempFile")
            i := 0
            for i < len(intermediate) {
                j := i + 1
                for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
                values := []string{}
                for k := i; k < j; k++ {
                    values = append(values, intermediate[k].Value)
                output := reducef(intermediate[i].Key, values)
                // this is the correct format for each line of Reduce output.
                fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
                i = j
            os.Rename(tmpFile.Name(), outFileName)
        args := AskingArgs{}
        args.WorkId = workID
        args.AskingType = "finish"
        args.TaskType = reply.XTask.TaskType
        args.TaskIndex = taskIndex
        reply := Reply{}
        CallTask(workID, &args, &reply)
func CallTask(workId int, args *AskingArgs, reply *Reply) {
    // fill in the argument(s).
    args.WorkId = workId
    ok := call("Coordinator.GetTask", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")
func CallHeartBeat(workId int, args *AskingArgs, reply *Reply) {
    // fill in the argument(s).
    args.WorkId = workId
    ok := call("Coordinator.HeartBeat", args, reply)
    if !ok {
        fmt.Printf("call failed!\n")
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
    sockname := coordinatorSock()
    //log.Println("sockname is " + sockname)
    c, err := rpc.DialHTTP("unix", sockname)
    if err != nil {
        log.Fatal("dialing:", err)
    defer c.Close()
    err = c.Call(rpcname, args, reply)
    if err == nil {
        return true
    return false
