MapReduce

MapReduce

@author qcliu

@time 2015/07/29

Abstract

MIT6.824 Lab1
总结。

Part I: Word count

第一个任务是单机版词频统计。入口为wc.gomain()。 统计的过程分为4个部分:

  • Split
  • Map
  • Reduce
  • Merge

Split

将一个大文件切分成若干个小文件。 采用逐行扫描的方式。
涉及到

Map

经过Spilt过程,kjv12.txt(file size:4834757)已经被切分成了5个文件。

mrtmp.kjv12.txt-0 966967
mrtmp.kjv12.txt-1 966941
mrtmp.kjv12.txt-2 966974
mrtmp.kjv12.txt-3 966970
mrtmp.kjv12.txt-4 966905

之后执行DoMap()操作。

for i:=0; i<nMap; i++ {
    DoMap(i, mr.file, mr.NReduce, Map)
}

对每一个子文件进行DoMap()。
在DoMap()中,将子文件的内容读到内存中的bype[]。对bype[]进行Map()操作。

Map()是对传入的string进行单词切分,然后建立一个HashMap,key是单词,value是单词出现的频率。这里可以采用bit方式记录词频,即1111代表4次。

这里Go提供了一个很好用的分割字符串的库。
strings.FieldsFunc

得到子文件中每个单词及其出现的频率后,将这份hashMap以JSON格式存入nreduce个文件中,这里nreduce为3。

对于文件mrtmp.kjv12.txt-0 966967,会被分为3个文件

mrtmp.kjv12.txt-0-0
mrtmp.kjv12.txt-0-1
mrtmp.kjv12.txt-0-2

GoJson

Reduce

在进入Reduce阶段时,已经存在了nmap*nreduce个文件。

mrtmp.kjv12.txt-0-0
mrtmp.kjv12.txt-0-1
mrtmp.kjv12.txt-0-2
mrtmp.kjv12.txt-1-0
mrtmp.kjv12.txt-1-1
mrtmp.kjv12.txt-1-2
mrtmp.kjv12.txt-2-0
mrtmp.kjv12.txt-2-1
mrtmp.kjv12.txt-2-2
mrtmp.kjv12.txt-3-0
mrtmp.kjv12.txt-3-1
mrtmp.kjv12.txt-3-2
mrtmp.kjv12.txt-4-0
mrtmp.kjv12.txt-4-1
mrtmp.kjv12.txt-4-2

在Reduce时,进行nreduce次DoReduce(), 每一次里面都打开nmap个文件。

在一次DoReduce()中,对nmap个文件建立一个map[string]*list.List,用于存放nmap个文件中不同单词以及在不同文件中出现的频率,这里的频率是用字符串长度表示的。将nmap个文件中单词出现的频率进行合计,用JSON格式写入文件中。nreduce次DoReduce()总共产生nreduce个merge文件。

Merge

Merge()时已经存在nreduce个文件,此处nreduce为3.每个文件中存放着若干单词和对应的频率。

Merge()操作对3个文件的内容再一次进行合计。用一个hashMap存放3个文件中的单词和对应的频率。将最终的结果写入文件

Part II: MapReduce

MapReduce
partII是真正的分布式的MapReduce。对应于上图,test_test.go是我们的UserProgram,在setup()中开启的master,之后开启了2个Worker。

javafunc TestBasic(t *testing.T) {
    InitLog()
    Logger.Printf("Test: Basic mapreduce ...\n")
    mr := setup()
    for i := 0; i < 2; i++ {
        go RunWorker(mr.MasterAddress, port("worker"+strconv.
        (i)),MapFunc, ReduceFunc, -1)
    }
    // Wait until MR is done
    <-mr.DoneChannel
    check(t, mr.file)
    checkWorker(t, mr.stats)
    cleanup(mr)
    fmt.Printf("  ... Basic Passed\n")
}

Master

master是负责切分文件,给Worker分配任务,合并结果。在开启Master时,将MapReduce通过rpc暴露给Worker。然后等待Worker注册。此时Worker可以调用MapReduce的Register(),从而通知Master有Worker可用。

Worker

开启Worker时,也通过RPC将Worker暴露给Master,之后通过RPC调用Register(),通知Master有Worker可用。等待Master分配任务。

Example

在mapreduce.go中,StarRegistrationServer()负责Master监听是否有Worker进行RPC调用。

javafunc (mr *MapReduce) StartRegistrationServer() {
    rpcs := rpc.NewServer()
    rpcs.Register(mr)//take mr expose to client
    os.Remove(mr.MasterAddress) // only needed for "unix"
    l, e := net.Listen("unix", mr.MasterAddress)
    if e != nil {
        log.Fatal("RegstrationServer", mr.MasterAddress, " error: ", e)
    }
    mr.l = l
    // now that we are listening on the master address, can fork off
    // accepting connections to another thread.
    go func() {
        for mr.alive {
            conn, err := mr.l.Accept()
            Logger.Printf("accepted %T from worker\n", conn)
            if err == nil {
                go func() {
                    rpcs.ServeConn(conn)
                    conn.Close()
                }()
            } else {
                DPrintf("RegistrationServer: accept error", err)
                break
            }
        }
        DPrintf("RegistrationServer: done\n")
    }()
}

其中,mr.l.Accept()会阻塞。如果Worker此时通过RPC调用Register()

javafunc call(srv string, rpcname string, args interface{}, reply interface{}) bool {
    c, errx := rpc.Dial("unix", srv)
    if errx != nil {
        return false
    }
    defer c.Close()
    err := c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }
    fmt.Println(err)
    return false
}
  1. srv为Master的地址。当Worker执行rpc.Dial("unix", srv)时,Master会收到这个连接请求。
  2. 于是conn, err := mr.l.Accept()的阻塞结束,到rpcs.ServeConn(conn)时继续阻塞,等待Worker的调用。
  3. 当Worker执行c.Call(rpcname, args, reply),其中rpcname为Master的一个方法名,此处是Register。
  4. 此时Master的rpcs.ServeConn(conn)阻塞结束,调用成功。

Part III: Handling worker failures

worker failures 可能发生在RPC调用的时候。

javafunc doMap(mr *MapReduce, worker chan string) {
    done := make(chan int)
    for i := 0; i<mr.nMap; i++ {
        go func(i int) {
            for avil := range worker {
                args := DoJobArgs_new(mr.file, Map, i, mr.nReduce)
                var reply DoJobReply
                ok := call(avil, "Worker.DoJob", args, &reply)
                if ok {
                    done<-1
                    worker<-avil
                    break// break the for range
                }else {
                    fmt.Printf("No aviliable worker for Map %d\n", args.JobNumber)
                }
            }
        }(i)
    }
    for i :=0; i<mr.nMap; i++ {
        <-done
    }
}

所以用for avil := range worker不停地从channel中取可用的worker来完成响应的工作,只有完成后才break

如果call不会发生错误,那么for avil := range worker完全可以改为avil:=<-worker