第7章 MapReduce的工作机制

第7章 MapReduce的工作机制

1、 作业的提交

可通过调用Job对象的submit()方法,也可以调用waitForCompletion()方法。

Job的submit()方法创建了一个内部的JobSummiter实例,并调用其submitJobInternal()方法。提交作业后,waitForCompletion()轮询进度,有变化就打印到控制台。作业完成后,若成功,显示作业计数器,失败则显示失败的错误记录。

JobSummiter实现作业提交过程:

  1. 向资源管理器请求一个新应用ID用于Mapreduce的ID。
  2. 检查作业输出说明,例如若没有输出目录或已存在,作业不提交,返回错误。
  3. 计算作业分片,出现错误就不提交,返回错误
  4. 将运行作业所需要的资源(暴扣作业Jar,配置文件和分片文件)复制到一个以作业ID命名的目录下的共享文件系统中。
  5. 最后通过调用资源管理器的submitApplication()方法提交作业。

 

2、作业的初始化

   资源管理器调用submitApplication后,将请求传递给Yarn调度器。调度器分配一个容器,然后在节点管理器的容器中启动application master进程。

   Application master主类为MRAppMaster。接受来自共享文件系统的资源文件。然后一个分片建立一个map任务以及确定多个reducer任务对象。任务ID分配。

   小作业将会被作为uber任务,运行于application master同一个JVM上。

 

3、任务分配

若作业不适合uber任务运行,则application master会为该作业所有map和reduce任务向资源管理器申请容器。

 

4、任务执行

一旦为任务分配了特定节点的容器,applicationmaster通过于节点管理器通信来启动容器。该任务由主类为YarnChild的一个java应用程序执行。然后进行资源本地化,最后运行任务。

 

5、进度和状态的更新

任务运行时,对其进度进行跟踪。对于map任务:进度就是已处理输入所占比例。对于reducer任务,有三个阶段(赋值,排序,运行reduce任务)各占1/3.

第7章 MapReduce的工作机制

 

6、作业完成

当application master收到作业最后一个任务完成的通知后,将作业状态设为true。然后再Job轮询状态时,便知道任务已成功完成,Job打印一条消息告知用户,然后从waitForCompletion()方法返回。Job的各种统计信息也输出到控制台。

 

7、失败

(1) 任务运行失败

用户代码抛异常---任务JVM向application master发送错误报告并退出。Application master将此次任务尝试标记为failed,释放容器。

JVM突然异常退出---节点管理器通知application master将此次任务尝试标记为失败。

长时间运行任务超时失败

 

任务失败后,application master会重新再调度任务执行,默认4次。还可以设置再不处罚任务失败的情况下允许任务失败的最大百分比,即允许任务失败,但不会种植运行整个作业。

(2) application master运行失败

       无论是Yarn application master和MapReduce application master,默认失败尝试次数都为2次。

       对于MapReduce application master,若其运行失败,将使用作业历史来恢复失败的应用程序所运行的任务状态,使其不必重新运行。默认情况恢复功能是开启的(yarn.app.mapreduce.am.job.recovery.enable)。

  1. 节点管理器运行失败
  1. 节点管理器和资源管理器通过心跳机制维持通信。若资源管理器10分钟内(yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms)没有收到某个节点管理器的心跳信息。资源管理器会认为节点管理器有问题,并通知停止该节点管理器同时将其从自己的节点池中移除以调度启动容器。
  2. 若在某节点管理器中应用程序的运行失败次数过高,那么即使该节点管理器自己并没有失败过,也会被资源管理器拉黑。黑名单由application master管理,对于MapReduce,如果一个节点管理器上有超过三个任务(mapreduce.job.maxtaskfailures.per.tracker)失败,application master会尽量将任务调度到不同节点。
  1. 资源管理器运行失败

 

8 shuffle和排序

       Shuffer即是将map输出作为输入传入给reducer的过程。

  1. map端

每个map任务都有一个环形内存缓冲区用于存储任务输出。默认大小是100MB(mapreduce.task.io.sort.mb)。一旦缓冲内容达到阈值(mapreduce.map.sort.spill.percent,默认0.8),一个后台线程就开始将内容溢出到磁盘中。当然,在溢出文件到磁盘之前,还需要对溢出数据进行分区和排序,还有可能运行Combiner,使溢出文件内容更紧凑,以此减少写到磁盘的数据和传递给reuder的数据。最后多个溢出文件合并成一个文件,保证每个map任务输出一个文件。

  1. reducer端

每个reduce任务需要集群上若干个map任务的map输出作为其特殊分区的文件。Reducer会启功复制线程进行输入内容的复制。默认5个线程。Map输出小的话,可以缓存在reduce任务的缓冲区中,但也会溢出到磁盘。当复制完所有mao输出以后,reduce任务进入排序阶段(归并排序)并且合并。最后是reduce阶段,直接将数据输入reduce函数。

 

9 推测执行

       在MapReduce模型中,将一个作业分为多个任务进行执行,其中可能出现少数拖后腿的任务。Hadoop不会尝试诊断或修复执行慢的任务,而是尽量检测并启动另一个相同的任务作为备份,这就是所谓的“推测执行”。

       推测执行是一种优化措施,并不能使作业运行更可靠。默认是开启的。