hadoop入门之mapreduce shuffle与yarn原理(五)

标签(空格分隔): hadoop


1 概念

shuffle是什么?先说map reduce mapreduce本身是一个,map处理中间数据,之后传给reduce阶段完成数据的归并处理。
那么shuffle的定义:就是maptask数据与reduce task数据的传递流程,称之为shuffle。

2 shuffle总体架构

2.1 shuffle总体架构图

hadoop入门之mapreduce shuffle与yarn原理(五)
shuffle是从map端输出到reduce端输入的一个中间虚线就是一个我画就是大体概括的一个过程。

2.2 大体流程

2.2.1 涉及源码类

这里先说下我job任务提交涉及到的一些源码类(LocalJobRunner),之后再说下shuffle原理。
job.submit是提交入口,实际会调用jobSubmit.submitJobInternal。
jobSubmit会实现提交数据,RPC交互则jobsubmit实际上委托Cluster而Cluster依赖于ClientProtocol 这个协议,ClientProtocol具体就是有两个实现类,LocalJobRunner和YarnJobRunner前者本地后者通过yarn跑任务,暂时看的是本地部分的源码。通过LocalJobRunner.Job内部类完成start.这个start主要是获取对应的splictInfo来创建不同的线程任务具体运行时MapTaskRunnable这个实现类。内部是调用MapTask的run方法,会创建对应的相关类实例,
比如out(输出) if reducetaskNum=0 创建DataOutPutStream否则会创建MapOutputCollector的具体实现类MapOutputBuffer 。
MapOutputBuffer这个实现类里面会有spillThread完成排序溢出,BlockingBuffer作为缓冲区,spillThread在溢出之前会使用MapOutputBuffer创建的QuickSort完成排序,在最后maptask调用close方法的时候,会调用MapOutputBuffer的flush方法 这个方法会将之前溢出的文件使用Merge排序进行合并成一个文件。在这个过程中会涉及到一些Context主要是用于存储上下文需要用到的一些实例。

2.2.2 shuffle流程说明

shuffle流程描述:
接下来说下涉及的原理:
1.MapOutPutCollector完成write输出,在输出之前内部会进行排序和溢出的操作,collecotr会创建具体的排序实例,作为SpillThread的排序算法,SpillThread里面维护了sortAndSpill方法完成排序及溢出。
2.collector直接输出的是一个BlockingBuffer的环形缓冲区,spillThread会根据是否到阀值溢出到一个个溢出文件中。
2.1(溢出区域)这里有两个组件我们可以自定义,一个自定义的partition根据我们定义完成溢出到不同的partitioin,还有就是Conbiner在spillThread溢出前根据是否有conbiner做一下溢出前的合并包装操作。
3.maptask运行完后调用close方法,调用colletor.flush方法完成溢出文件的合并操作。合并到一个文件
4.之后会创建ReduceTask完成map数据的根据分区fetch 再进行一次Merge排序,然后调用具体的reduce方法完成reduce操作。
4.1这里reduce的调用时一组数据调用一次,所以我们可以设置GroupingComparatorClass 完成对应的分组,根据*策略完成reduce调用。

2.2 shuffle各组件

Partition 默认是:HashPartitioner<K, V> 我们可以自定义去设置分区
Conbiner: 1.这个类是map端的reducer所以需要继承Reducer,在spill溢出前会调用,之后合并的时候也会调用。
GroupingComparatorClass:这个类是RawComparator的子类,我们通过定义这个类完成reduce端的 key分组。

2.3小文件优化策略

补充下这个与shuffle没什么关系,主要是在map读取数据的时候做的,如果文件过小但由于一个文件会有一个FileSplict所以我们可以将小文件作为合并策略。
通过CombinerFileInputFormat我们可实现合并多个小文件为一个Splict减少Map任务量,由于MapTask每次起都需要消耗资源,如果频繁起而本身任务没有什么数据量这样的话效率太低。

3 Mapreduce与yarn

3.1 yarn是什么

yarn是一个任务调度系统,本身与mapreduce是解耦。
yarn通过resourceManage作为主管nodemanage负责具体程序运行资源分配
所以yarn可以运行各种分布式程序。例如:storm spark mapreduce等。

3.2提交流程

3.2.1 流程图hadoop入门之mapreduce shuffle与yarn原理(五)

流程说明:
1.rpc调用ResourceManage要创建一个job
2.resourcemanage返回一个stogingDir +jobid的资源存储路径 hdfs://…/jobid
3.提交序列化job.xml job.jar splict.info metasplict等文件到返回的地址上
4.通知rm创建资源提交完成,让rm创建任务到任务队列中创建容器资源
5.nodemanage领取到任务
6.创建AppMaster容器
7.下载job资源到容器内运行
8.通知rm创建maptask容器的任务
9.其他nodenamage领取任务
10.appmaster启动maptask
11.maptask阶段完成后,appmaster通知rm创建reducetask容器用于运行reduce
12.nodemanage领取任务创建reduce运行所需容器
13.appmaster通知运行reducetask
14.reducetask获取maptask运行完成的结果数据,运行reducetask。

4.本地运行模式&yarn运行模式

4.1 本地运行模式

主要通过
cnof.set(“mapreduce.framework.name”,“local”);
至于文件系统如果要设置就可以设置
conf.set(“fs.defaultFS”,“hdfs://hadoop1:9000”);
或者默认不设置也行 不设置则运行file:// 是本地文件系统

4.2本地运行yarn运行模式

cnof.set(“mapreduce.framework.name”,“yarn”);
conf.set(“yarn.resourcemanager.hostname”,"${rmhost}")
conf.set(“fs.defaultFS”,“hdfs://hadoop1:9000”);
由于window和linux的环境变量不一样,所以直接运行不成功,如果可以最好是通过linux完成提交。这样在本地就能直接提交任务到yarn上。
如果有需要可以自己去改下环境变量的转换。