Spark内核之美(三):DAGScheduler的原理与源码分析

1、前言

RDD具有延迟计算的特性,当操作是transformation算子的时候,并不执行操作,直到遇到action算子的时候才开始执行计算。Spark会根据Action操作之前一系列Transform操作的关联关系,生成一个DAG,在后续的操作中,对DAG进行Stage划分,生成Task并最终运行。

表1  Spark中支持的RDD转换(transformation算子)和动作(action算子)

       转换

 (transformation算子)

map(f : T ) U) : RDD[T] ) RDD[U]
filter(f : T ) Bool) : RDD[T] ) RDD[T]
flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U]
sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)
groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]
reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]
union() : (RDD[T]; RDD[T]) ) RDD[T]
join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]
cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))]
crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)]
mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)
sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]
partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]

         动作

         (action算子)

count() : RDD[T] ) Long
collect() : RDD[T] ) Seq[T]
reduce(f : (T; T) ) T) : RDD[T] ) T
lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)
save(path : String) : Outputs RDD to a storage system, e.g., HDFS

RDD在进行转换时,RDD之间的依赖关系又分为窄依赖和宽依赖。

(1)窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区(即与数据规模无关);

(2)宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。

窄依赖每个child RDD 的partition的生成操作都是可以并行的,而宽依赖则需要所有的parent partition shuffle结果得到后再进行。

由于在RDD的一系类转换中,若其中一些连续的转换都是窄依赖,那么它们是可以并行的。但是,存在宽依赖的RDD操作则不能并行执行,宽依赖则需要所有的parent partition shuffle结果得到后再进行。所有,Spark将宽依赖为划分界限,将Job换分为多个Stage。而一个Stage里面的转换任务,我们可以把它抽象成TaskSet。一个TaskSet中有很多个Task,它们的转换操作都是相同的,不同只是操作的对象是对数据集中的不同子数据集。

DAGScheduler用于对Application进行分析,然后根据各RDD之间的依赖关系划分Stage,根据这些划分好的Stage,对应每个Stage会生成一组Task,将Task Set提交到TaskScheduler后,会由TaskScheduler启动Executor进行任务的计算。 过程如下图所示:

Spark内核之美(三):DAGScheduler的原理与源码分析

在任务调度模块中最重要的三个类是: 

1. org.apache.spark.scheduler.DAGScheduler 

2. org.apache.spark.scheduler.SchedulerBackend 

3. org.apache.spark.scheduler.TaskScheduler 

这里面SchedulerBackend主要起到的作用是为Task分配计算资源。我们先来看一来在SparkContext中是如何创建DAGScheduler、SchedulerBackend及TaskSchedule的:

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

TaskScheduler对象需要在DAGScheduler对象构造之前先生成,是由于在生成DAGScheduler的构造方法中会从传入的SparkContext中获取到TaskScheduler对象def this(sc: SparkContext) = this(sc, sc.taskScheduler)。 
  看一下DAGScheduler的相关源码

class DAGScheduler(
    private[scheduler] val sc: SparkContext,// 获得当前SparkContext对象
    private[scheduler] val taskScheduler: TaskScheduler,// 获得当前saprkContext内置的taskScheduler
    listenerBus: LiveListenerBus, // 异步处理事件的对象,从sc中获取
    mapOutputTracker: MapOutputTrackerMaster,//运行在Driver端管理shuffle map task的输出,从sc中获取
    blockManagerMaster: BlockManagerMaster, //运行在driver端,管理整个Job的Block信息,从sc中获取
    env: SparkEnv,// 从sc中获取
    clock: Clock = new SystemClock())
  extends Logging {

  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

  private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)

  private[scheduler] val nextJobId = new AtomicInteger(0)// 生成JobId
  private[scheduler] def numTotalJobs: Int = nextJobId.get()// 总的Job数
  private val nextStageId = new AtomicInteger(0)// 下一个StageId
  // 记录某个job对应的包含的所有stage
  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
  // 记录StageId对应的Stage
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
  /**
    从shuffle依赖项ID映射到将为该依赖项生成数据的shufflemapstage。仅包括当前正在运行的作业的一部分的阶段
(当需要shuffle阶段的作业完成时,映射将被删除,shuffle数据的唯一记录将位于mapoutputtracker中)。
   */
  // 记录每一个shuffle对应的ShuffleMapStage,key为shuffleId
  private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
  // 记录处于Active状态的job,key为jobId, value为ActiveJob类型对象
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

  // 等待运行的Stage,一般这些是在等待Parent Stage运行完成才能开始
  private[scheduler] val waitingStages = new HashSet[Stage]

  // 处于Running状态的Stage
  private[scheduler] val runningStages = new HashSet[Stage]

  // 失败原因为fetch failures的Stage,并等待重新提交
  private[scheduler] val failedStages = new HashSet[Stage]

  // active状态的Job列表
  private[scheduler] val activeJobs = new HashSet[ActiveJob]
  // 处理Scheduler事件的对象
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

...
}

DAGScheduler构造完成,并初始化一个eventProcessLoop实例后,会调用其eventProcessLoop.start()方法,启动一个多线程,然后把各种event都提交到eventProcessLoop中。

2、提交Job

一个Job实际上是从RDD调用一个Action操作开始的,该Action操作最终会进入到org.apache.spark.SparkContext.runJob()方法中,在SparkContext中有多个重载的runJob方法,下面我们以count作为一个action操作进行讲解,其调用栈如下:

  • rdd.count 
    • SparkContext.runJob 
      • DAGScheduler.runJob 
        • DAGScheduler.submitJob 
          • DAGSchedulerEventProcessLoop.doOnReceive 
            • DAGScheduler.handleJobSubmitted

其中,rdd.count的源码如下:

 def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext.runJob会触发DAGScheduler.runJob,其中SparkContext.runJob方法的源码如下:

// SparkContext.runJob
def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

这里调用dagScheduler.runJob()方法后,正式进入之前构造的DAGScheduler对象中。这里的rdd.doCheckpoint()并不是对自己Checkpoint,而是递归的回溯parent rdd 检查checkpointData是否被定义了,若定义了就将该rdd Checkpoint:

 private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
           //若想要把checkpointData定义过的RDD的parents也进行checkpoint的话,
           //那么我们需要先对parents checkpoint。
           //这是因为,如果RDD把自己checkpoint了,
           //那么它就将lineage中它的parents给切除了。
            dependencies.foreach(_.rdd.doCheckpoint())
          }
          checkpointData.get.checkpoint()
        } else {
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

具体的checkpoint实现可见《Spark内核之美(一):RDD的原理与源码分析》的5.3小节。

DAGScheduler的runJob会触发DAGScheduler的submitJob,其中DAGScheduler.runJob的源码如下:

   //DAGScheduler.runJob
   /**
   * 参数介绍:
   * @param rdd: 执行任务的目标RDD
   * @param func: 在RDD的分区上所执行的函数
   * @param partitions: 需要执行的分区集合;有些job并不会对RDD的所有分区都进行计算的,比如说first()
   * @param callSite:用户程序的调用点
   * @param resultHandler:回调结果
   * @param properties:关于这个job的调度器特征,比如说公平调度的pool名字,这个会在后续讲到 
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
      waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

DAGScheduler.submitJob的源码如下:

  //DAGScheduler.submitJob
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // 确认没在不存在的partition上执行任务
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
    //递增得到jobId
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      //若Job没对任何一个partition执行任务,
      //则立即返回
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

eventProcessLoop是一个DAGSchedulerEventProcessLoop类对象,即一个DAG调度事件处理的监听。eventProcessLoop中调用doOnReceive来进行监听,doOnReceive的源码如下:

//DAGSchedulerEventProcessLoop.doOnReceive
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    //当事件为JobSubmitted时,
    //会调用DAGScheduler.handleJobSubmitted
    // 处理Job提交事件
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    // 处理Map Stage提交事件
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    // 处理Stage取消事件
    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)
    // 处理Job取消事件
    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)
    // 处理Job组取消事件
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    // 处理所以Job取消事件
      case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
    // 处理Executor分配事件
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
    // 处理Executor丢失事件
    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)
    // 处理完成事件
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)
    // 处理task集失败事件
    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    // 处理重新提交失败Stage事件
    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

最后,Job会执行DAGScheduler.handleJobSubmitted方法,至此,Job的提交任务就算完成了。

//DAGScheduler.handleJobSubmitted
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)

    submitWaitingStages()
  }

3、Stage的划分

Stage的划分过程中,会涉及到宽依赖和窄依赖的概念,宽依赖是Stage的分界线,连续的窄依赖都属于同一Stage。 

Spark内核之美(三):DAGScheduler的原理与源码分析

从上图可以看到:

1、RDD_G与RDD_B为窄依赖,因此RDD_G与RDD_B会被划分到同一个Stage中;但是RDD_G与RDD_F为宽依赖,因此RDD_G与RDD_F不会被划分到同一个Stage中。

2、RDD_B与RDD_A为宽依赖,因此RDD_B与RDD_A不会被划分到同一个Stage中;但是RDD_A已经没有parent RDD,因此RDD_A会被单独划分到一个Stage中。

3、RDD_F与RDD_D为窄依赖,因此RDD_F与RDD_D会被划分到同一个Stage;RDD_D与RDD_C为窄依赖,因此RDD_D与RDD_C会被划分到同一个Stage;RDD_F与RDD_E为窄依赖,因此RDD_F与RDD_E会被划分到同一个Stage;此外,RDD_C,RDD_E都没有parent RDD,因此深度遍历结束,将RDD_F、RDD_D、RDD_C、RDD_E会被划分到同一个Stage。

具体的Stage划分过程如下:

首先,我们想 newResultStage RDD_G所在的Stage3,在new Stage之前会调用getParentStagesAndId,getParentStagesAndId中又会调用getParentStages,来广度优先的遍历RDD_G所依赖的RDD。如果是窄依赖,就纳入G所在的Stage3,如RDD_B就纳入了Stage3。

若过是宽依赖,我们这里以RDD_F为例(与RDD_A处理过程相同)。我们就会调用getShuffleMapStage,来判断RDD_F所在的Stage2是否已经生成了,如果生成了就直接返回。若还没生成,我们先调用getAncestorShuffleDependencies。

getAncestorShuffleDependencies类似于getParentStages,也是用广度优先的遍历RDD_F所依赖的RDD。如果是窄依赖,如RDD_C、RDD_D和RDD_E,都被纳入了F所在的Stage2。但是假设RDD_E有个parent RDD ``RDD_H,RDD_H和RDD_E之间是宽依赖,那么该怎么办呢?我们会先判断RDD_H所在的Stage是否已经生成。若还没生成,我们把它put到一个parents Stack 中,最后返回。

对于那些返回的还没生成的Stage我们会调用newOrUsedShuffleStage,newOrUsedShuffleStage会调用newShuffleMapStage,来生成新的Stage。而newShuffleMapStage的实现类似于newResultStage。这样我们就可以递归下去,使得每个Stage所依赖的Stage都已经生成了,再来生成这个的Stage。如这里,会将RDD_H所在的Stage生成了,然后在再生成Stage2。

newOrUsedShuffleStage生成新的Stage后,会判断Stage是否被计算过。若已经被计算过,就从mapOutPutTracker中复制计算结果。若没计算过,则向mapOutPutTracker注册占位。最后,回到newResultStage中,new ResultStage,这里即生成了Stage3。至此,Stage划分过程就结束了。

Spark将宽依赖为划分界限,将Job换分为多个Stage。调用栈为:

  • DAGScheduler.newResultStage
    • DAGScheduler.getParentStagesAndId
      • DAGScheduler.getParentStages
        • DAGScheduler.getShuffleMapStage 
          • DAGScheduler.getAncestorShuffleDependencies
            • DAGScheduler.newOrUsedShuffleStage
              • DAGScheduler.newShuffleMapStage

Spark的Stage调用是从最后一个RDD所在的Stage,ResultStage开始划分的,这里即为G所在的Stage。但是在生成这个Stage之前会生成它的parent Stage,就这样递归的把parent Stage都先生成了。

    //DAGScheduler.newResultStage  
    private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    // 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    // 创建当前最后的ResultStage
    stageIdToStage(id) = stage // 将ResultStage与stageId相关联
    updateJobIdStageIdMaps(jobId, stage) // 更新该job中包含的stage
    stage
  }

DAGScheduler.getParentStagesAndId的方法主要是为当前的RDD向前探索,找到宽依赖处划分出parentStage,并为当前RDD所属Stage生成一个stageId。在这个方法中,getParentStages的调用链最终递归调用到了这个方法,所以,最后一个Stage的stageId最大,越往前的stageId就越小,stageId小的Stage先执行。

  //DAGScheduler.getParentStagesAndId
  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId) // 传入rdd和jobId,生成parentStage
    // 生成当前stage的stageId。同一Application中Stage初始编号为0
    val id = nextStageId.getAndIncrement() 
    (parentStages, id)
  }

DAGScheduler.getParentStages方法的流程是:从当前rdd开始往前探索父rdd,在每一个宽依赖处生成一个parentStage,而窄依赖的rdd,继续压入栈中,等待下一轮分析窄依赖父rdd的父rdd,一直找到宽依赖生成新的stage,或者直到第一个rdd为止。同时,使用一个HashSet来保存访问过的rdd,后面分析时遇到重复依赖时也能保证每个rdd只被分析了一次。一个Job中,除了最后一个Stage是ResultStage类型之外,他的Stage都是ShuffleMapStage结构。

    //DAGScheduler.getParentStages  
     private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    //将存储ParentStages
    val parents = new HashSet[Stage]
    //存储已将访问过了的RDD
    val visited = new HashSet[RDD[_]]
    // 存储需要被处理的RDD
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        //加入访问集合
        visited += r
        //遍历该RDD所有的依赖
        for (dep <- r.dependencies) {
          dep match {
            //若是宽依赖则生成新的Stage
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, firstJobId)
            //若是窄依赖则加入Stack,等待处理
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    //在Stack中加入最后一个RDD
    waitingForVisit.push(rdd)
    //广度优先遍历
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    //返回ParentStages List
    parents.toList
  }

其实getParentStages使用的就是广度优先遍历的算法,若知道这点也容易理解了。虽然现在Stage并没有生成,但是我们可以看到划分策略是:广度遍历方式的划分parent RDD 的Stage。

若parent RDD 和 child RDD 为窄依赖,则将parent RDD 纳入 child RDD 所在的Stage中。如图,B被纳入了Stage3中。

若parent RDD 和 child RDD 为宽依赖,则parent RDD将纳入一新的Stage中。如图,F被纳入了Stage2中。

getShuffleMapStage会先去根据shuffleId去查找shuffleToMapStage

  //DAGScheduler.getShuffleMapStage  
  private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      //若找到则直接返回
      case Some(stage) => stage
      case None =>
        // 检查这个Stage的Parent Stage是否生成
        // 若没有,则生成它们       
        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleToMapStage.contains(dep.shuffleId)) {
            shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
          }
        }
        // 生成新的Stage
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
        //将新的Stage 加入到 HashMap
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        //返回新的Stage
        stage
    }
  }

  //shuffleToMapStage中保存了关于Stage的HashMap
  private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]

可以发现这部分的代码和上述的newResultStage部分很像,所以可以看成一种递归的方法。

我们再来看下getAncestorShuffleDependencies,可想而知,它应该会和newResultStage中的getParentStages会非常类似:

  //DAGScheduler.getAncestorShuffleDependencies
  private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val parents = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
                parents.push(shufDep)
              }
            case _ =>
          }
          waitingForVisit.push(dep.rdd)
        }
      }
    }

    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents
  }

可以看到的确和newResultStage中的getParentStages会非常类似,不同的是这里会先判断shuffleToMapStage是否存在这个Stage,不存在的话会push到parents这个Stack,最会返回给上述的getShuffleMapStage,调用newOrUsedShuffleStage生成新的Stage。

DAGScheduler.newOrUsedShuffleStage那现在就来看newOrUsedShuffleStage是如何生成新的Stage的。 首先ShuffleMapTask的计算结果(其实是计算结果数据所在的位置、大小等元数据信息)都会传给Driver的mapOutputTracker。所以需要先判断Stage是否已经被计算过:

  //DAGScheduler.newOrUsedShuffleStage
  private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    //生成新的Stage
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
    //判断Stage是否已经被计算过
    //若计算过,则把结果复制到新的stage
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      //如果没计算过,就在注册mapOutputTracker Stage
      //为存储元数据占位
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

DAGScheduler.newShuffleMapStage的递归就发生在newShuffleMapStage,它的实现和最一开始的newResultStage类似,也是先getParentStagesAndId,然后生成一个ShuffleMapStage:

  //DAGScheduler.newShuffleMapStage
  private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    // 获取当前rdd的父Stage和stageId
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId) 
    // 生成新的ShuffleMapStage
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages, 
      firstJobId, callSite, shuffleDep)

    stageIdToStage(id) = stage // 将ShuffleMapStage与stageId相关联
    updateJobIdStageIdMaps(firstJobId, stage) // 更新该job中包含的stage
    stage
  }

至此,整个Stage的划分就算完成了。

4、Stage的提交

调用栈如下:

  • DAGScheduler.handleJobSubmitted 
    • DAGScheduler.submitStage 
      • DAGScheduler.getMissingParentStages
        • DAGScheduler.submitMissingTasks

首先,我们再回过头来看在2小节“提交Job”的最后一步handleJobSubmitted:

//DAGScheduler.handleJobSubmitted
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)

    submitWaitingStages()
  }

在第3小节“划分Stage”中我们已经深入的讲解了finalStage的生成:

finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

接下来,我们继续往下看handleJobSubmitted的代码:

    //DAGScheduler.handleJobSubmitted    
    //生成新的job
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    //得到job提交的时间
    val jobSubmissionTime = clock.getTimeMillis()
    //得到job id
    jobIdToActiveJob(jobId) = job
    //添加到activeJobs HashSet
    activeJobs += job
    //将finalStage甚至ActiveJob为该job
    finalStage.setActiveJob(job)
    //得到stage 的id 信息
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    //监听
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    //提交
    submitStage(finalStage)
    //等待
    submitWaitingStages()

接下来我们来看Stage是如何提交的。我们需要找到哪些parent Stage缺失,然后我们先运行生成这些Stage。这是一个深度优先遍历的过程:

  //DAGScheduler.submitStage
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //得到缺失的Parent Stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //如果没有缺失的Parent Stage,
          //那么代表着该Stage可以运行了
          //submitMissingTasks会完成DAGScheduler最后的工作,
          //向TaskScheduler 提交 Task
          submitMissingTasks(stage, jobId.get)
        } else {
        //深度优先遍历
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

DAGScheduler.getMissingParentStage也是一个深度优先遍历的过程:

 //DAGScheduler.getMissingParentStages
  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
             //若是宽依赖 并且 不可用 ,
             //则加入 missing HashSet
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
                //若是窄依赖
                //则加入等待访问的 HashSet
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

最后,我们来看下DAGScheduler.submitMissingTasks最后的工作,提交Task:

    //DAGScheduler.submitMissingTasks
    private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // pendingPartitions 是 HashSet[Int]
    //存储待处理的Task
    stage.pendingPartitions.clear()

    // 找出还未就算的Partition
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    //从一个ActiveJob中得到关于这个Stage的
    //调度池,job组描述等信息
    val properties = jobIdToActiveJob(jobId).properties
    // runningStages 是 HashSet[Stage]
    //将当前Stage加入到运行中Stage集合
    runningStages += stage

    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
//向listenerBus发送SparkListenerStageSubmitted事件    
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    var taskBinary: Broadcast[Array[Byte]] = null
    try {
    //对于最后一个Stage的Task,
    //序列化并广播(rdd, func)。
    //若是其他的Stage的Task,
    //序列化并广播(rdd, shuffleDep)
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          JavaUtils.bufferToArray(
            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
        case stage: ResultStage =>
          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      //若序列化失败,停止这个stage
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage

        // 停止执行
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    val tasks: Seq[Task[_]] = try {
    //对于最后一个Stage的Task,
    //则创建ResultTask。
    //若是其他的Stage的Task,
    //则创建ShuffleMapTask。
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      //创建TaskSet并提交
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)

      submitWaitingChildStages(stage)
    }
  }

到这里,本文主要分析了Scheduler模块中DAGScheduler的作用,构成,以及Stage划分和Stage最终的提交过程,仔细观察这一部分的主要代码中,在多处都会看到listenerBus.post方法的调用,针对不同的Stage事件,会将这个事件提交到LiveListenerBus中,将Stage事件相关过程进行记录,并使得Spark其他部分能够及时获取到Stage的最新状态。

Spark内核之美(三):DAGScheduler的原理与源码分析