MapReduce的执行流程

MapReduce 执行流程:

(shuffle在maptask和reducetask之间)

1、  数据的底层存储:文件在hdfs中以Block的方式存放时,假如分为3块,为了计算逻辑切片的大小,用FileInputFormat类获得get split方法进行切片,理论上一个切片==一个块区,最后一个逻辑切片的大小默认为140.8M。一个切片对应一个maptask任务。

2、  Maptask中运行mapper类的代码,每个maptask有几行调用几次run方法中的map函数,通过context.write(k,v)输出到shuffle中,每次输出一行。

3、  数据先写入到收集器collector(缓冲队列),数据分为元数据和原始数据,再写到环形缓冲区(内存中)默认大小为100M,环形缓冲区中达到溢写阀值80M后会写入到磁盘,预留的20M接收maptask的数据写入,其中当20M写满后,80M还没有释放,写线程会阻塞,直到缓冲区中溢写到磁盘后再次启动,当80M释放时,元数据和原始数据形成背对模式开始新分界线,环形缓冲区中最后内容小于80M时,flush到磁盘。

4、  第一次排序:(快排),数据溢写到磁盘前,会在缓冲区中按照reducetask数量划分成相同数量的分区进行编号排序,按照map输出的key进行排序,同一分区中相同的key在一起,(如有combier,会是map       更加紧凑),最后在本地磁盘生成分好区且排好序的文件。

5、  第二次排序:merge以及sort的过程,本地磁盘的所有溢写文件根据小溢写文件分区而分区,从而得到一个结果文件,每分区数会再次根据map输出的key进行排序(如有combiner会使得重复合并),得到的结果是严格分区有序的,这个过程也是归并排序。

6、  Reduce任务启动,mapper结束后会向MRAPPMaser汇报信息,只要一个maptask就会启动一个reducetask,reducetask的个数由mapred.xite-xml的mapreduce.job.reduces配置决定,或在初始化job时候调用job.setNumReducerTask(int),reducetask的一个线程定期向MRAPPMaster询问mapper输出结果文件位置,并进行数据抓去复制(fetch),同时启动5个默认线程,所有的reducertask复制完map结果文件后,MRAPPMaster通知NodeManager删除Reducetask。复制完成后,reducetask进入merge阶段,循环合并map结果文件,经合并文件得到一个“最终文件”,可能在磁盘也可能在内存中,其余的reducetask分别执行相同的步骤。

7、  Reducetask运行的为Reducer中的run方法,一个reducetask创建一个对象调用一次run方法,run中调用reduce方法,通过context.write(k,v)输出。

8、  Reduce方法先写出到collector(缓冲队列)中再到buffer(内存缓冲区中),达到缓冲区中阀值50M后追加到hdfs文件(part-r-0000)中。

MapReduce的执行流程