MapReduce:作业运行机制

目录

作业提交

作业初始化

任务分配

任务执行

关于进度监控

作业完成


 

MapReduce应用实际上是以YARN应用运行,若理解了YARN运行机制,MR不过是多了一些细节处理

MapReduce作业运行的整个过程中有5个独立的实体:

  • 客户端:调用Job对象的submit()方法提交作业,或者调用waitForComplete()提交之前没有提交过的作业并等待它的完成
  • YARN Resource Manager:负责协调集群上的资源分配,启动Application Master
  • YARN Node Manager:启动和监控管理机器上的容器
  • MRApplication Master:MapReduce的Application Master,负责协调运行MR作业的任务
  • 分布式文件系统HDFS:用来与其他实体间共享作业文件

 

流程图

MapReduce:作业运行机制

作业提交

(步骤1)Job类对象的submit()方法内部会创建一个JobSubmiter对象,并调用其submitJobInternal()方法。提交作业后通过waitForComplete()每秒轮询作业的进度,实际是检查各个计数器,若发现自上次报告有变化就把进度发送给控制台,那么作业成功完成后就会显示作业计数器。

JobSubmiter中作业提交过程的实现:

(步骤2)先向RM请求一个新应用ID,作为MR作业ID

检查作业的输出路径,如是否指定了输出目录或输出目录是否已存在

计算作业的输入分片,若分片无法计算,则可能因为输入路径不存在,作业也就不会提交

(步骤3)将运行作业的容器环境(即所需要资源,如作业JAR文件、配置文件和计算所得的输入分片)复制到一个以作业ID命令的目录下的共享文件系统中。作业JAR的副本数量由mapreduce.client.submit.file.replication属性决定,默认为10,多个副本以供集群中的多个NM进行访问

(步骤4)调用RM的submitApplication()方法提交作业

作业初始化

(步骤5a)RM收到调用它submitApplication()方法消息后,调度器会分配一个容器并在集群中选择一个NM,RM在这个NM中启动容器

(步骤5b)并在NM管理下在容器中开启Application Master的进程。MR作业的Application Master是一个Java应用,其主类是MRAppMaster

(步骤6)MRAppMaster初始化job,由于需要管理task的进度和完成报告,每个task对应一个(猜的,此时还不知道有多少个task)簿记对象(bookkeeping,即一些计数器)来记录它们的进度,最终反映的是整个job的进度。

(步骤7)MRAppMaster接受来自共享文件系统中的输入分片,然后对每一个分片创建一个MapTask对象,属性mapreduce.job.reduces或Job的serNumReduceTask()方法决定了ReduceTask的数量。任务ID在此时分配。

MRAppMaster决定了如何运行这个MR作业中的各个任务。如果任务很小,即在新的容器中分配和运行任务的开销大于并行运行它们的开销,就会将整个作业在和自己同一个JVM中运行,会先运行MapTask再运行ReduceTask,这样的作业称为uberized,即uber模式。

关于小作业:默认情况下,10个Mapper以下且只有1个Reducer且输入大小小于一个HDFS块的作业为小作业,对应属性为mapreduce.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreduces和mapreduce.job.ubertask.maxbytes。并且需要将属性mapreduce.job.ubertask.enable设置为true以明确开启Uber模式。

在任务运行之前,MRAppMaster会调用setupJob()方法创建作业的最终输出目录和任务输出的临时工作文件

任务分配

(步骤8)若作业不适合以Uber模式运行,那么MRAppMaster会为作业中的所有task向RM申请容器,一个task对应一个容器。首先MapTask发出申请请求,MapTask的优先级要高于ReduceTask。当有5%的MapTask已经完成时ReduceTask的请求才会发起。ReduceTask在集群中运行的位置是随意的,但调度器会尽量让MapTask数据本地化。

默认情况下,MapTask和ReduceTask申请到的容器限制为1G内存和1个VCore

任务执行

(步骤9a、9b)RM收到了task的资源申请,由调度器分配指定节点上的容器,然后MRAppMaster通过与指定NM通信来启动容器(可以说AM的容器是由RM启动的,而task的容器是由AM启动的)

任务由主类为YarnChild的一个Java应用程序执行

(步骤10)当然在执行任务之前,首先需要将所需资源本地化,会从共享文件系统中获取如作业的配置、JAR文件和分布式缓存文件等

(步骤11)运行MapTask或ReduceTask

Streaming运行用户提供的map任务和reduce任务,并与之通信

MapReduce:作业运行机制

Streaming任务使用标准I/O流与进程进行通信,在任务执行过程中,Java程序都会将KV对传给外部的进程,外部进程通过用户自定义的map函数和reduce函数来执行并将KV队传回给该任务的Java进程

关于进度监控

当MapTask或ReduceTask运行时,对应的子进程会与自己的Application Master通过umbilical接口通信。每隔3秒,这些task会向自己的Application Master报告进度和状态,包括计数器。Application Master会形成一个作业的汇聚视图。

而RM中的Application Manager界面显示了所有运行中的作业,并且分别有链接指向这些作业各自的Application Master界面。

在作业期间,客户端每秒轮询一次Application Master获取最近的状态;也可以通过Job的getStatus()方法得到一个JobStatus实例,包括了作业的所有状态信息。

MapReduce:作业运行机制

作业完成

在客户端进行轮询状态时,当Application Master收到作业最后一个任务已完成的通知后,便把作业的状态设置为成功,于是Job从waitForComplete()方法返回。Job的统计信息和计数值也在控制台输出完毕。

然后Application Master和任务的容器清理其工作状态,作业信息由作业历史服务器存档。

 


参考资料:《Hadoop权威指南 第4版》