MapReduce 之shuffle过程
我们知道在MapReduce程序在map阶段和reduce阶段之间,会进行shuffle操作。那么我们来详细分析一下shuffle的过程或者原理
在MapTask调用Mapper#map方法之前,会构造一个RecordWriter对象,如果Job没有reduce操作,那么new一个NewDirectOutputCollector
如果包含Reduce操作,就new一个NewOutputCollector操作
然后把这个Writer封装在MapContext中,当map阶段某一个map任务完成,就会调用MapContext#write方法。
Write方法又会调用MapOutputCollector#collect方法:
publicvoidwrite(Kkey, Vvalue)throwsIOException, InterruptedException {
collector.collect(key,value,partitioner.getPartition(key,value,
partitions));
}
具体的实现是由MapOutputBuffer来实现的。
这时候Map阶段Shuffle开始:
一 Map阶段的shuffle分析
1分区
一个Map任务完成之后,会进行分区。
1.1我们使用的是什么类来分区呢?首先会判断我们是否在配置文件配置了mapreduce.job.reduces参数,如果没有设置或者设置为1,那么我们就创建一个分区数为0的Partitioner。如果这个参数> 1,如果我们自己在代码设置Partitioner,那么就构造一个我们自己的Partitioner,如果没有,那么就构造一个默认的HashPartitioner。
1.2怎么分区呢?
Map任务结束后,每一个任务都会构造一个内存缓冲区kvbuffer。这个内存缓冲区是一个环形的数据结构,本质是一个字节数组。在初始化的时候就会构造这个字节数组:
kvbuffer= new byte[maxMemUsage];
bufvoid= kvbuffer.length;
kvmeta= ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);
bufstart= bufend = bufindex = equator;
kvstart= kvend = kvindex;
这儿有几个概念:
Kvbuffer:默认大小是100M,我们也可以自己调整这个缓冲区大小,参数是:mapreduce.task.io.sort.mb
kvmeta:存放索引的元数据信息
kvstart:开始下标
kvend:在spill开始的时候,它会被赋值为kvindex,spill结束,又被赋值为kvstartz,这时候kvstart=kvend。即只要kvstart!= kvend就表示正在spill,否则表示普通状态
kvindex:下一个可以记录的索引位置
最开始这三个值默认都是一样的
bufvoid:实际使用的缓冲区
bufmark:用于标记记录的结尾
bufIndex:初始值为0,指缓冲区增长到哪儿了
在kvindex和bufIndex之间的那部分就是还未溢写的数据,如果只要这部分数据超过80%,就会启动spill操作
流程:
#首先判断剩余的内存缓冲区是否大于0,而且现在是否处于spill阶段,如果剩余的内存缓冲区>0且不处于spill阶段,那么我们就把结果往内存缓冲区写。
#如果写入缓冲区的数据超过了阀值,默认80%就会启用spill程序
#SpillThread是一个线程类,专门负责溢写数据到磁盘,如果没有溢写发生,就一直处于等到状态,否则进行排序和溢写,调用sort
AndSpill方法
#根据分区数目创建SpillRecord
#然后调用getSpillFileForWrite获取HDFS文件路径,会生成一个带有编号的文件,比如output/spill{spillNumber }.out
lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT+ "/spill" + spillNumber + ".out", size, getConf());
#指定一个排序策略HeapSorter/QuickSorter进行排序,默认是QuickSorter
#构造一个IFile.Writer对象,然后输出流输出到指定文件
writer= new Writer<K, V>(job, partitionOut, keyClass,valClass,
codec,spilledRecordsCounter);
这里的codec支持压缩,有助于性能提升。它会根据指定的压缩策略:
mapreduce.map.output.compress.codec。如果没有指定,默认就是
DefaultCodec。
#如果用户设置了Combiner,在溢写到文件之前,还会进行一次combine操作,它继承了Reducer类,本质就是一个Reducer类。然后调用Combiner#combine操作,而后就会调用Reducer#run方法。
#将元数据信息写入内存索引数据结构:SpillRecord.如果内存中索引大于1MB,则写到文件名类似output/spillN.out.index文件,N就是当前spill的次数
#最后RecordWriter在close的时候会去merge当前map 任务产生的那些临时文件,最后会创建file.out和file.out.index文件用来存储最终的输出和索引
二 Reduce的shuffle操作
在Map 任务结束以后,Reduce就要开始从运行Map任务那些节点上复制内存中或者磁盘上的数据。然后再进行合并排序操作。
2.1copy阶段
首先会组装一个ShuffleConsumerPlugin插件,并对他进行初始化
然后调用其run方法,这个插件开始运行
#构造一个EventFetcher线程对象:它主要就是获取已经完成的Map任务事件,然后遍历事件,然后把主机名和URL放进一个集合
#获取进行复制的Fetcher数目,如果是本地就1个,如果是远程取决于mapreduce.reduce.shuffle.parallelcopies参数,默认是5
#然后每一个Fetcher开始工作,他们从之前保存的Host的map集合里获取主机名和URL,然后进行拷贝
#拷贝完毕关闭线程资源