MapReduce之Shuffle

参考文档:
http://matt33.com/2016/03/02/hadoop-shuffle/#Partition
http://langyu.iteye.com/blog/992916
《hadoop权威指南》(第四版)

1.参数设置

参数名称 默认设置 解释
mapreduce.task.io.sort.mb 100 map和reduce的缓存容量,单位MB
mapreduce.map.sort.spill.percent 0.80 缓存触发溢写的比例
mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local 临时存放溢写数据的地址
mapreduce.task.io.sort.factor 10 归并操作时,可以同时进行归并操作流的数量
mapreduce.map.combine.minspills 3 map端做归并操作是,如果溢写得到文件数量如果超过该参数,就触发combine
mapreduce.map.output.compress false map端写磁盘是是否对数据进行压缩
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec map端写磁盘是,如果开启了压缩功能,这个参数决定了压缩格式
mapreduce.reduce.shuffle.parallelcopies 5 reduce端复制map端数据时启动的copy线程数量
mapreduce.reduce.shuffle.input.buffer.percent 0.70 reduce端缓冲区大小与JVM堆内存的比例
mapreduce.reduce.shuffle.merge.percent 0.66 触发spill的缓冲区比例
mapreduce.reduce.merge.inmem.threshold 1000 触发sipll的内存阈值,单位MB

2.主要流程

MapReduce之Shuffle

上图是shuffle的主要流程,但是不是很详细这里大概得介绍一下

shuffle是map和reduce在处理数据时候,对数据进行排序
如图所示,排序在map端会进行一次,在reduce端聚合数据后再次进行排序,然后在运行ruduce任务。

下面对排序流程做个详细解释

3.map端详解

MapReduce之Shuffle
如图是map端的排序过程

  1. map端接收数据源上的数据
  2. map将处理过的数据进行partition操作,将partition后得到的信息和map处理得到的信息一起写到内存中,就是图中的Memory Buffer。
  3. 内存中的数据写满了的话,将其溢写(spill)到磁盘上。如果设置了combiner,就进行combine操作。map作业完成后,将缓存内剩余的数据溢写到磁盘上。 写磁盘过程中可以对数据进行压缩(根据配置文件)。
  4. 将磁盘上的数据做合并处理(merge)。产生格式为一个key对应多个value。例如{“aaa”, [5, 8, 2, …]},说明在写在磁盘上的key为aaa的数据分别有5,8,2等等的值。

上面只是大致说了一下过程,接着会对上文中的几个过程做详细解释:

partition操作

所谓的partiton操作就是对map处理过的数据根据key进行分类,分类得到的结果决定了这条数据到底是由哪个reduce来处理(可能有多个reduce)。

举个例子,一条数据的key是aaa,经过partition处理后,得到结果是0,那么代表这条数据要被编号为0个reduce处理。然后Partition的结果和这条数据(包括key和value)将一起被写到内存中等待合并。

spill(溢写)操作

map将数据写到内存中,但是缓存容量是有限的。缓存容量由参数mapreduce.task.io.sort.mb设置。当缓存写满了一定比例(mapreduce.map.sort.spill.percent设置比例多少),例如参数设置了0.8就会在缓存写满百分之80的时候,将这百分之80的内容溢写到磁盘上。同时剩下百分之20的缓存将继续被写入数据,与写入磁盘互不影响。以此重复,所以可能会发生多次溢写。

最后maper计算完毕,会将缓存中剩余没有溢出的数据全部写到磁盘中。

在溢写过程中,如果设置了combiner就会调用它,对数据进行处理,减少写到磁盘中数据量。

merge(合并)

溢写完成后,会在本地磁盘对应目录(mapreduce.cluster.local.dir 设定)生成一个或者多个溢写文件。然后会将多个文件归并起来,生成一个输出文件,就是reduce后面加载的文件。同一个key值的数据会被归并到一起,例如key值为aaa的数据有多个value,归并后格式为{“aaa”, [5, 8, 2, …]}。

如果溢写文件很多,可能会执行多次合并,每次归并操作可以一次对多个流进行合并,最大流个数由参数mapreduce.task.io.sort.factor决定,如果溢出文件的个数非常多,可以适当增加这个参数,提高执行效率。

归并过程中也可能触发combiner,需要看溢写文件的个数,如果文件个数超过mapreduce.map.combine.minspills(设定的个数),就会启动combiner(前提是设置了)。

在写磁盘的过程中,可以对数据进行压缩,默认情况下是不压缩的。mapreduce.map.output.compress参数设置为true就开启了数据压缩功能。mapreduce.map.output.compress.codec参数决定了压缩格式。

4. reduce端详解

MapReduce之Shuffle
上图主要讲了reduce端的排序过程,这段写的不是很好,因为权威指南和网上文档不是很一致,所以有时间可能要看下官方文档或者源码。

  1. reduce端启动一些copy线程,通过HTTP协议的方式从map端拉取数据。复制线程的数量由mapreduce.reduce.shuffle.parallelcopies参数设置。需要注意的是,只要有一个map端工作完成,reduce就开始复制数据了,所以这个过程和map端的计算可以说是并行经行的。

  2. merge阶段。reduce的merge阶段分为2个阶段。
    1)第一个阶段,就是reduce从不同的map端获取到数据写入缓冲区(缓冲区和jvm堆大小比例由mapreduce.reduce.shuffle.input.buffer.percent参数决定),一旦数据超过缓冲区阈值大小(两个参数决定,后面细说),就将数据溢写到磁盘上。
    如果map端使用了压缩,那么在reduce端缓存中会对数据进行解压缩。
    2)第二个阶段,就是溢写出来的文件越来越多,就会对这些文件做merge操作,直到生成最终文件。一次merge合并流最大次数,和map端一样由mapreduce.task.io.sort.factor参数决定。

  3. merge的最后会生成一个文件,大多数情况下存在于磁盘中,但是需要将其放入内存中。当reducer 输入文件已定,整个 Shuffle 阶段才算结束。然后就是 Reducer 执行,把结果放到 HDFS 上。

ps.额外说明
reduce端溢写的阈值由两个参数决定。第一个是mapreduce.reduce.shuffle.merge.percent,这个是个百分比数据,如果超过这个内存百分比就会触发spill。另一个是mapreduce.reduce.merge.inmem.threshold,表示如果超过这个内存就会spill。

5.重点

触发combine的操作有三个地方,一个是在map端的spill过程中,一个是在map端merge,如果需要merge的文件超过设定舒服会触发combine。还有一个是在reduce端进行spill是,会触发。

//其他重点还没想到=-=