5. MapReduce中Shuffle
shuffle翻译为中文就叫 ”洗牌“,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。有些抽象,当前不理解也没关系,先了解一下shuffle的流程。
本节内容参考了如下内容
[1] MapReduce:详解Shuffle(copy sort merge)过程 https://blog.****.net/xiaolang85/article/details/8528892
[2] MapReduce shuffle过程详解 https://blog.****.net/u014374284/article/details/49205885
[3] 从零开始学Hadoop大数据分析 6.2.5 Shuffle过程
一、Shuffle概览
Shuffle不是一个单独的任务,它是MapReduce执行中的步骤,Shuffle过程会涉及到上面谈到的Combiner、Sort、Partition三个过程。它横跨MapReduce中的Map端和Reduce端,因此,我们也会将Shuffle分为 Map Shuffle 和 Reduce Shuffle 。
在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的期望可以有:
- 完整地从map task端拉取数据到reduce 端。
- 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
- 减少磁盘IO对task执行的影响。
二、Map端的Shuffle
如上图所示,在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是Map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认),这里的partition值是指利用Partitioner.getPartition得到的结果,他也是Partitioner分区的依据。上述流程还可以用如下图进行表示:
Collector
Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构Collector中。Collector的大小可以通过io.sort.mb设置,默认大小为100M。该空间不够用就会触发Spill方法。但是一般不会将整个空间占满才触发Spill方法,而是会设置一个一个Spill门限,默认为0.8。当当前占用空间达到Collector空间的80%,就会触发Spill方法。
Sort
Spill被触发后,并不会直接将键值对溢出,而是先调用SortAndSpill方法,按照partition值和key两个关键字升序排序。这样的排序的结果是,键值对 key-value 按照partition值聚簇在一起,同一个partition值,按照key值有序。这里的排序使用的是快速排序。
Spill
Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序键值对 key-value 挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill方法,对结果进行进一步合并后再写出。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
Merge
Map任务如果输出数据量很大,可能会进行好几次Spill,会产生很多Spill??.out类型的文件,分布在不同的磁盘上。这个时候,我们就需要将所有的文件合并的Merge过程。
Merge会扫描本地文件,找到所有的的Spill文件,这些Spill文件都是局部有序的(同一个Spill文件按照partition值有序,同一个partition值内按照key值有序)。
然后为merge过程会一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有的键值对 key-value ,也就是一个Segment的数据。
然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。
这样Map端的任务就算是完成了。
三、Reduce端的Shuffle
在Reduce端,shuffle主要分为复制Map输出、排序合并两个阶段。
Copy
Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束。
Merge Sort
Copy过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。
当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。
一个简明版的Shuffle流程
如果你也对大数据感兴趣,欢迎关注公众号 大数据面试学习指北