MapReduce的执行流程

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。 shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,ComBine,合并等过程)

shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。

维度一,流程维度回顾。从Map输出到Reduce输入。

MapReduce的执行流程
MapReduce的执行流程
MapReduce的执行流程
第一步:InputFormat
InputFormat 在HDFS文件系统中读取要进行计算的数据
输出给Split
第二步:Split
Split 将数据进行逻辑切分,切分成多个任务。
输出给RR
第三步:RR(recordReader)
RR 将切分后的数据转换成key value进行输出
key : 每一行行首字母的偏移量
value: 每一行数据
输出给Map
第四步:Map
接收一条一条的数据(有多少行数据Map运行多少次,输出的次数根据实际业务需求而定)
根域业务需求编写代码
Map的输出是 key value的 list
输出给Shuffle(partition)
---------------------------------------Map-------------------------------------------------------
第五步: partition
partition: 按照一定的规则对 **key value的 list进行分区
输出给Shuffle(sort)

第六步:Sort
Sort :对每个分区内的数据进行排序。
输出给Shuffle(Combiner)
第七步:Combiner
Combiner: 在Map端进行局部聚合(汇总)
目的是为了减少网络带宽的开销
输出给Shuffle(Group)
第八步:Group
Group: 将相同key的key提取出来作为唯一的key
将相同key对应的value提取出来组装成一个value 的List
输出给Shuffle(reduce)
------------------------------------Shuffle--------------------------------------------
第九步:reduce
reduce: 根据业务需求对传入的数据进行汇总计算。
输出给Shuffle(outputFormat)
第十步:outputFormat
outputFormat:将最终的额结果写入HDFS
------------------------------------reduce--------------------------------------------

维度二,内存维度回顾。从Map输出到Reduce输入

MapReduce的执行流程
map输出的数据写入环形缓冲区(内存),缓冲区的默认大小是100M(可修改)。当数据达到阈值(默认0.8-可修改)时,环形缓冲区进行flash,
环形缓冲区:数据在输出的同时,数据也可以写入空余的空间内。
当flash的数据个数达到一定的数量时(默认4个)。对数据进行合并(merge)。

Reduce在Map拷贝数据

Map 输出的结果写入本地,reduce主动发出拷贝进程到Map端拷贝数据。
reduce获取大数据后,将数据写入内存,当数据达到阈值时进行flash.
当flash的个数达到一定的量时,进行合并,最终发送给reduce
MapReduce的执行流程

Shffle阶段的内存与流程到底是个什么关系呢?

1).Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。
2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
3).Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
4).Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M