YARN提交MR程序流程以及MR调优

YARN提交MR程序流程

YARN主要由ResourceManagerNodeManagerApplicationMasterContainer等组件构成,如图4-23所示。

YARN提交MR程序流程以及MR调优

图4-23 Yarn基本架构

Yarn工作机制

1.Yarn运行机制,如图4-24所示。

YARN提交MR程序流程以及MR调优

 

图4-24  Yarn工作机制

2.工作机制详解

        1MR程序提交到客户端所在的节点。

        2YarnRunnerResourceManager申请一个Application

        3RM将该应用程序的资源路径返回给YarnRunner

        4)该程序将运行所需资源提交到HDFS上。

        5)程序资源提交完毕后,申请运行mrAppMaster

        6RM将用户的请求初始化成一个Task

        7)其中一个NodeManager领取到Task任务。

        8)该NodeManager创建容器Container,并产生MRAppmaster

        9MRAppmasterHDFS上拷贝资源到本地。

        10MRAppmasterRM 申请运行MapTask资源。

        11RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

        12MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTaskMapTask对数据分区排序。

13MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask

        14ReduceTaskMapTask获取相应分区的数据。

        15)程序运行完毕后,MR会向RM申请注销自己。

5.4 作业提交全过程

1.作业提交过程之YARN,如图4-25所示。

YARN提交MR程序流程以及MR调优

 

图4-25 作业提交过程之Yarn

作业提交全过程详解

1)作业提交

1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。

2步:ClientRM申请一个作业id

3步:RMClient返回该job资源的提交路径和作业id

4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。

5步:Client提交完资源后,向RM申请运行MrAppMaster

2)作业初始化

6步:当RM收到YarnClient的请求后,将该job添加到容量调度器中。

7步:某一个空闲的NM领取到该Job

8步:该NM创建Container并产生MRAppmaster

9步:下载Client提交的资源到本地。

3)任务分配

10步:MrAppMasterRM申请运行多个MapTask任务资源。

11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

4)任务运行

12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTaskMapTask对数据分区排序。

13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask

14步:ReduceTaskMapTask获取相应分区的数据。

15步:程序运行完毕后,MR会向RM申请注销自己。

5)进度和状态更新

YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。

6)作业完成

除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

2.作业提交过程之MapReduce,如图4-26所示

YARN提交MR程序流程以及MR调优

 

图4-26 作业提交过程之MapReduce

 

MapReduce工作流程

1.流程示意图,如图4-6,4-7所示

YARN提交MR程序流程以及MR调优

图4-6  MapReduce详细工作流程(一)

细节:环形缓冲区到达80%时,spill溢写线程根据key大小把对应的索引排序,然后根据索引的顺序把对应的数据写出到磁盘,写入线程继续往剩下的20%写,当spill完成后,会把index区的数据复制到data区到达80%的起点,然后开始背靠背反向写入。

YARN提交MR程序流程以及MR调优

图4-7  MapReduce详细工作流程(二)

2.流程详解

上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:

1MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

5ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

6ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序),并不会合并相同keyvalue值。

7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法,)。

千万要注意的细节:MapReducereduce会循环遍历每一个kv键值对,当遇到不同的keykeyGroupingComparator定义的,默认是完全相同的key进入同一个reduce,可以自定义为根据几个字段相同就看作是相同的key)就重新进入reduce方法。

3.注意

Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M

4.源码解析流程

context.write(k, NullWritable.get());

output.write(key, value);

collector.collect(key, value,partitioner.getPartition(key, value, partitions));

        HashPartitioner();

collect()

        close()

        collect.flush()

sortAndSpill()

    sort()   QuickSort

mergeParts();

    YARN提交MR程序流程以及MR调优

collector.close();

3.3 Shuffle机制

3.3.1 Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。如图4-14所示。

YARN提交MR程序流程以及MR调优

 

图4-14  Shuffle机制

3.3.2 Partition分区

YARN提交MR程序流程以及MR调优

YARN提交MR程序流程以及MR调优

YARN提交MR程序流程以及MR调优

 MapTask工作机制

MapTask工作机制如图4-12所示。

YARN提交MR程序流程以及MR调优

 

图4-12  MapTask工作机制

        1Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value

        2Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value

        3Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

        4Spill阶段:即溢写,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

        溢写阶段详情:

        步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

        步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.outN表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

        步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

        5Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

        当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index

        在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

 

ReduceTask工作机制

ReduceTask工作机制,如图4-19所示。

YARN提交MR程序流程以及MR调优

图4-19 ReduceTask工作机制

        (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

        (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

        (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

        (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

 

MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

数据输入

YARN提交MR程序流程以及MR调优

6.2.2 Map阶段

YARN提交MR程序流程以及MR调优

 

6.2.3 Reduce阶段

 

YARN提交MR程序流程以及MR调优

YARN提交MR程序流程以及MR调优

6.2.4 I/O传输

YARN提交MR程序流程以及MR调优

 

6.2.5 数据倾斜问题

YARN提交MR程序流程以及MR调优

YARN提交MR程序流程以及MR调优

6.2.6 常用的调优参数

1.资源相关参数

(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)

表4-12

配置参数

参数说明

mapreduce.map.memory.mb

一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。

mapreduce.reduce.memory.mb

一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。

mapreduce.map.cpu.vcores

每个MapTask可使用的最多cpu core数目,默认值: 1

mapreduce.reduce.cpu.vcores

每个ReduceTask可使用的最多cpu core数目,默认值: 1

mapreduce.reduce.shuffle.parallelcopies

每个Reduce去Map中取数据的并行数。默认值是5

mapreduce.reduce.shuffle.merge.percent

Buffer中的数据达到多少比例开始写入磁盘。默认值0.66

mapreduce.reduce.shuffle.input.buffer.percent

Buffer大小占Reduce可用内存的比例。默认值0.7

mapreduce.reduce.input.buffer.percent

指定多少比例的内存用来存放Buffer中的数据,默认值是0.0

(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

表4-13

配置参数

参数说明

yarn.scheduler.minimum-allocation-mb  

给应用程序Container分配的最小内存,默认值:1024

yarn.scheduler.maximum-allocation-mb         

给应用程序Container分配的最大内存,默认值:8192

yarn.scheduler.minimum-allocation-vcores  

每个Container申请的最小CPU核数,默认值:1

yarn.scheduler.maximum-allocation-vcores  

每个Container申请的最大CPU核数,默认值:32

yarn.nodemanager.resource.memory-mb  

给Containers分配的最大物理内存,默认值:8192

(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)

表4-14

配置参数

参数说明

mapreduce.task.io.sort.mb  

Shuffle的环形缓冲区大小,默认100m

mapreduce.map.sort.spill.percent  

环形缓冲区溢出的阈值,默认80%

2.容错相关参数(MapReduce性能优化)

表4-15

配置参数

参数说明

mapreduce.map.maxattempts

每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.reduce.maxattempts

每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.task.timeout

Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

6.3 HDFS小文件优化方法

6.3.1 HDFS小文件弊端

HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间另一方面就是索引文件过大使得索引速度变慢。

6.3.2 HDFS小文件解决方案

小文件的优化无非以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。

(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。

(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。

YARN提交MR程序流程以及MR调优

YARN提交MR程序流程以及MR调优