hadoop 之mapreduce shuffle map输出过程

一、概要描述 shuffle是MapReduce的一个核心过程,因此没有在提交中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:

hadoop 之mapreduce shuffle map输出过程

 本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。

二、 流程描述

  1. 在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
  2. 在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
  3. NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
  4. MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
  5. SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
  6. 当map输出完成后,会调用output的close方法。
  7. 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。

(1)分片(split)操作:

split只是将源文件的内容分片形成一系列的 InputSplit,每个 InputSpilt 中存储着对 应分片的数据信息(例如,文件块信息、起始位置、数据长度、所在节点列表…),并不是将源文件分割成多个小文件,每个InputSplit 都由一个 mapper 进行后续处理。

每个分片大小参数是很重要的,splitSize 是组成分片规则很重要的一个参数,该参数由三个值来确定:

minSize:splitSize 的最小值,由 mapred-site.xml 配置文件中 mapred.min.split.size 参数确定。

maxSize:splitSize 的最大值,由 mapred-site.xml 配置文件中mapreduce.jobtracker.split.metainfo.maxsize 参数确定。

blockSize:HDFS 中文件存储的快大小,由 hdfs-site.xml 配置文件中 dfs.block.size 参数确定。

splitSize的确定规则:splitSize=max{minSize,min{maxSize,blockSize}}

数据格式化(Format)操作:

将划分好的 InputSplit 格式化成键值对形式的数据。其中 key 为偏移量,value 是每一行的内容。

值得注意的是,在map任务执行过程中,会不停的执行数据格式化操作,每生成一个键值对就会将其传入 map,进行处理。所以map和数据格式化操作并不存在前后时间差,而是同时进行的。

spacer.gif

2)Map 映射:

是 Hadoop 并行性质发挥的地方。根据用户指定的map过程,MapReduce 尝试在数据所在机器上执行该 map 程序。在 HDFS中,文件数据是被复制多份的,所以计算将会选择拥有此数据的最空闲的节点。

在这一部分,map内部具体实现过程,可以由用户自定义。

3)Shuffle 派发:

Shuffle 过程是指Mapper 产生的直接输出结果,经过一系列的处理,成为最终的 Reducer 直接输入数据为止的整个过程。这是mapreduce的核心过程。该过程可以分为两个阶段:

Mapper 端的Shuffle:由 Mapper 产生的结果并不会直接写入到磁盘中,而是先存储在内存中,当内存中的数据量达到设定的阀值时,一次性写入到本地磁盘中。并同时进行 sort(排序)、combine(合并)、partition(分片)等操作。其中,sort 是把 Mapper 产 生的结果按照 key 值进行排序;combine 是把key值相同的记录进行合并;partition 是把 数据均衡的分配给 Reducer。

Reducer 端的 Shuffle:由于Mapper和Reducer往往不在同一个节点上运行,所以 Reducer 需要从多个节点上下载Mapper的结果数据,并对这些数据进行处理,然后才能被 Reducer处理。

4)Reduce 缩减:

Reducer 接收形式的数据流,形成形式的输出,具体的过程可以由用户自定义,最终结果直接写入hdfs。每个reduce进程会对应一个输出文件,名称以part-开头。