图解Spark的任务调度机制


由于在实际开发中都是使用yarn-cluster模式,所以本文也以该模式为分析的前提。

1. 相关概念

首先明确几个在任务调度方面的常用概念:
(1)Job 是以 Action 算子为界,遇到一个Action算子则触发一个Job;
(2)Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle )为界,遇到 Shuffle 做一次划分;
(3)Task 是 Stage 的子集,以并行度(分区数)来衡量,这个 Stage 分区数是多少,则这个Stage 就有多少个 Task。
另外在任务调度过程中,有两个非常重要的调度器:DAGScheduler和TaskScheduler。
(1)DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
(2)TaskScheduler负责Task级的调度,将DAGScheduler传过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。

2. 任务调度

接下来我们来具体分析一下Spark的任务调度流程:

2.1 Stage级别任务调度:

概略来说涉及到以下方法:
图解Spark的任务调度机制
以下是基于源码执行流程的具体分析:
图解Spark的任务调度机制
(1)Job 由最终的RDD和Action方法封装而成;
(2)SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算。
(3)划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据

2.2 Task级别的调度

Spark Task 的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示:
图解Spark的任务调度机制
TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。

简要的过程如下:
图解Spark的任务调度机制
基于源码分析的详细过程如下:
图解Spark的任务调度机制
将TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象;准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task。

2.2.1 Task的调度策略:

2.2.1.1 FIFO调度策略:

如果是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保存在一个FIFO队列中。
图解Spark的任务调度机制

2.2.1.2 Fair调度策略:

图解Spark的任务调度机制
FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。
在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,因此使用相同的排序算法。
排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。
注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。

  1. 如果 A 对象的runningTasks大于它的minShare,B 对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks 比 minShare 小的先执行)
  2. 如果A、B对象的 runningTasks 都小于它们的 minShare,那么就比较 runningTasks 与 math.max(minShare1, 1.0) 的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行)
  3. 如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)
  4. 如果上述比较均相等,则比较名字。
    整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行。
    FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。
    从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

2.2.2 本地化调度

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用getPreferrdeLocations()得到partition的优先位置,由于一个partition对应一个Task,此partition的优先位置就是task的优先位置,
对于要提交到TaskScheduler的TaskSet中的每一个Task,该Task优先位置与其对应的partition对应的优先位置一致。
从调度队列中拿到TaskSetManager后,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task。 根据每个Task的优先位置,确定Task的Locality级别,Locality一共有五种,优先级由高到低顺序:
图解Spark的任务调度机制
在调度执行时,Spark 调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提升了运行性能

2.2.3 失败重试和黑名单

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。
在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,
“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。