MapReduce作业流程


MapReduce作业流程

HDFS ——> InputFormat 阶段

InputFormat 是一个 Interface ,包含了getSplits(…) 、getRecordReader(…) 两个方法。这两个方法分别完成着以下工作:
    1)方法 getSplits 将输入的数据切分为多个逻辑上的 InputSplit,其中每一个 InputSplit 作为一个 map 的输入,InputSplit 的大小默认为 block 的大小(block 是 HDFS 存储文件的单位,默认为128MB)
    2)方法 RecordReader 将每个 InputSplit 解析成 records,再依次将 record 解析成<K,V>对。RecordReader 决定了 map 任务如何读取输入数据, 例如一行一行的读取还是一个字节一个字节的读取等等。

用户在启动 MapReduce 的时候需要指定一个 InputFormat 的 Implement:
    InputFormat 直接子类有三个:DBInputFormat、DelegatingInputFormat 和FileInputFormat,分别表示输入文件的来源为从数据库、用于多个输入以及基于文件的输入。
    对于FileInputFormat,即从文件输入的输入方式,又有五个继承子类:CombineFileInputFormat,KeyValueTextInput,NLineInoutFormat,SequenceFileInputFormat,TextInputFormat。
    Hadoop 默认的输入方式是 TextInputFormat,它重写了 FileInputFormat 的creatRecordReader 和 isSplit 方法。createRecordReader( ) 方法返回的是 LineRecordReader 对象,该对象读取记录时是按行读取,以回车键和换行符为行分割符,它的 <K,V> 就代表<行开始位置的偏移量,该行的内容>。

InputFormat ——> Split 阶段

InputSplit(输入分片)是 MapReduce 对文件进行处理和运算的输入单位,只是一个逻辑概念,每个 InputSplit 并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的 path 和 hosts )和长度(由 start 和 length 决定)。默认情况下,InputSplit 的大小就是 HDFS 的 blockSize。

Split ——> RecordReader 阶段

针对每个 split,新建一个 RecorReader 读取 InputSplit 里面的数据,形成一个个 <Key,Value> 形式的记录(record),作为 Map 任务的输入。RecordReader 实例是由输入格式定义的,默认的输入格式为 TextInputFormat。

RecordReader ——> Map 阶段

RecordReader 决定了 map 任务如何读取输入数据。当一行记录比较长的话,可能会被切分到不同的 InputSplit, 但这并不会对Map造成影响,Map 能够到读取不同的 InputSplit,直到把这一行记录读取完成 。 如果不是 first split,则会丢弃第一个 record,避免了重复读取的问题。
Map 任务最终是交给 Map 任务执行器 org.apache.hadoop.mapreduce.Mapper 来执行的,Mapper 封装了应用程序 Map 阶段的数据处理逻辑,Map 任务对应的上下文执行环境 Context 是通过 RecordReader 来获取输入数据,通过 RecordWriter 保存被 Mapper 处理后的数据。
RecordReader 被重复地调用直到整个输入分片被处理完毕,每一次调用 RecordReader 都会调用 Mapper 的 map() 方法(用户定义的工作)。

Map ——> Partition 阶段

Mapper 任务划分数据的过程称作 Partition,MapReduce提供 Partitioner 接口,作用就是根据 key 或 value 及 reduce 的数量来决定当前的输出数据最终应该交由哪个reduce task 处理,一个reduce 任务对应一个分区的数据。这样做是为了避免有些 reduce 任务分配到大量数据,而有些reduce 任务却分到很少数据,甚至没有分到数据的尴尬局面。
partition 值默认计算公式 —— “hash(key)/reduce task的数量” 取余数。

Partioner ——> spill(sort & Combiner) 阶段

Hadoop 默认的排序算法,只会针对 key 值进行排序,按照字典顺序排序。map 端排序是为了减轻 reduce 端排序(reduce 阶段需要分组,将 key 相同的放在一起)的压力。
对每个分区中的数据进行排序,如果此时设置了combine,将排序后的结果进行combine操作,这样做的目的是让尽可能少的数据写入到磁盘。
combiner函数:hadoop 的一个针对 map 任务输出的优化函数,其结果会作为 reduce 任务输入。并不是所有任务都适合于该函数,一般该函数用于计算最大值,最小值等功能。由于该函数属于优化函数,其可能被调用0次到多次。其会使 map 输出更加紧凑。其本质上相当于一个本地 reduce操作。

spill——> reduce 阶段

reduce 会接收到不同 map 任务传来的数据,并且每个 map 传来的数据都是有序的。如果reduce端接收的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merg.percent决定),则对数据合并后溢写到磁盘中。
在reduce 阶段,每个 reduce task 会对收到的数据排序,数据按照 key 分成了若干组,之后以组为单位交给 reduce 函数处理。
reduce 函数的任务是将输入的一系列具有相同键的值以某种方式组合起来。通过接收到的数据,针对每一个<key, (list of values)>会调用一次用户自定义的 reduce 方法进行处理,得到新的 <key,value> 对,并作为结果输出。

reduce ——> OutputFormat ——> HDFS 阶段

输出被直接写到输出文件系统,一般是HDFS。