
1 JobGenerator的前世

1.1 JobGenerator的难兄难弟ReceiverTracker


1.2 ReceiverTracker 的难兄难弟JobGenerator


1.3 ReceiverTracker与receivedBlockTracker 的相爱相杀


  • 我们可以看到receivedBlockTracker包含在ReceiverTracker,最重要的是receivedBlockTracker内部维护了一个 streamIdToUnallocatedBlockQueues,用于追踪Executor上报上来的Block。

      class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
        private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
        private val receiverInputStreamIds = { }
        private val receivedBlockTracker = new ReceivedBlockTracker(
  • receivedBlockTracker内部重要的元数据存储结构:

      private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]

1.4 StreamingContext如何双剑合璧

JobScheduler里面包含核心的重量级成员,分别是:jobGenerator 和 receiverTracker。其中初始化如下:


private val jobGenerator = new JobGenerator(this)

receiverTracker = new ReceiverTracker(ssc)


2 JobGenerator的今生

  • JobGenerator 中重要成员RecurringTimer,负责用户定义时间窗的触发
      private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
          longTime => Time(longTime))), "JobGenerator")
  • JobGenerator 的启动,通过StreamingContext来触发,最终调用startFirstTime
    def start(): Unit = synchronized {
      if (eventLoop != null) return // generator has already been started
      // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
      // See SPARK-10125
      eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
        override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
        override protected def onError(e: Throwable): Unit = {
          jobScheduler.reportError("Error in job generator", e)
      if (ssc.isCheckpointPresent) {
      } else {
  • JobGenerator 最终启动ssc.graph和timer,因此整个处理逻辑开始启动了。
      private def startFirstTime() {
          val startTime = new Time(timer.getStartTime())
          graph.start(startTime - graph.batchDuration)
          logInfo("Started JobGenerator at " + startTime)

2.1 JobGenerator的4步核心处理逻辑


2.2 第一步:allocateBlocksToBatch

  • JobGenerator持有jobScheduler的引用,jobScheduler持有receiverTracker的引用

       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
  • receiverTracker持有receivedBlockTracker的引用

  • 从streamIdToUnallocatedBlockQueues取出streamId对应的所有间隔为200ms(default)采集的block,并把它放到timeToAllocatedBlocks中。

       * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
        def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
          if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
            val streamIdToBlocks = { streamId =>
                (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
            }.toMap                                                   <=点睛之笔
            val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)    <=点睛之笔
            if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
              timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
              lastAllocatedBatchTime = batchTime
            } else {
              logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
          } else {
            // This situation occurs when:
            // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
            // possibly processed batch job or half-processed batch job need to be processed again,
            // so the batchTime will be equal to lastAllocatedBatchTime.
            // 2. Slow checkpointing makes recovered batch time older than WAL recovered
            // lastAllocatedBatchTime.
            // This situation will only occurs in recovery time.
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
  • timeToAllocatedBlocks 是 receiverTracker(成员receivedBlockTracker)中包含的核心成员,反向迭代到调用链最顶端,根据timeToAllocatedBlocks来生成generatedRDDs
  • streamIdToUnallocatedBlockQueues :没有被分配批次的Block集合
  • timeToAllocatedBlocks :已经被分配批次的block集合
  • 下面是DStream的模板代码,就是为了生成RDD来使用的,getOrCompute方法只有DStream有,所以上一级生成RDD后,就会放入generatedRDDs中。
  • generatedRDDs 中没有,就会调用compute,而Compute又会调用getOrCompute。getOrCompute又会调用Compute,反反复复进行一直回溯到InputDStream的Compute
     * Get the RDD corresponding to the given time; either retrieve it from cache
     * or compute-and-cache it.
    private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
      // If RDD was already generated, then retrieve it from HashMap,
      // or else compute the RDD
      generatedRDDs.get(time).orElse {
        // Compute the RDD if time is valid (e.g. correct time in a sliding window)
        // of RDD generation, else generate nothing.
        if (isTimeValid(time)) {
          val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
            // Disable checks for existing output directories in jobs launched by the streaming
            // scheduler, since we may need to write output to an existing directory during checkpoint
            // recovery; see SPARK-4835 for more details. We need to have this call here because
            // compute() might cause Spark jobs to be launched.
            SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
          rddOption.foreach { case newRDD =>
            // Register the generated RDD for caching and checkpointing
            if (storageLevel != StorageLevel.NONE) {
              logDebug(s"Persisting RDD ${} for time $time to $storageLevel")
            if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
              logInfo(s"Marking RDD ${} for time $time for checkpointing")
            generatedRDDs.put(time, newRDD)                           <=点睛之笔
        } else {
  • MapPartitionedDStream的compute方法
       override def compute(validTime: Time): Option[RDD[U]] = {
          parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
  • eceiverInputDstream中的compute方法
       * Generates RDDs with blocks received by the receiver of this stream. 
            override def compute(validTime: Time): Option[RDD[T]] = {
              val blockRDD = {
            if (validTime < graph.startTime) {
              // If this is called for any time before the start time of the context,
              // then this returns an empty RDD. This may happen when recovering from a
              // driver failure without any write ahead log to recover pre-failure data.
              new BlockRDD[T](, Array.empty)
            } else {
              // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
              // for this batch
              val receiverTracker = ssc.scheduler.receiverTracker                    <=点睛之笔
              val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)    <=点睛之笔
              // Register the input blocks information into InputInfoTracker
              val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
              ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
              // Create the BlockRDD 
              createBlockRDD(validTime, blockInfos)                       <=点睛之笔                 

可见 allocateBlocksToBatch的作用就是为了把对应窗的Block放进timeToAllocatedBlocks。方便调用链使用。

2.3 第二步:graph.generateJobs

  • DStreamGraph的核心作用是注册了outputStreams,那么是什么时候注册的呢?
  • Action函数 print -> foreachRDD -> ForEachDStream -> register -> ssc.graph.addOutputStream(this)
  • DStreamGraph.generateJobs最终调用了 outputStream.generateJob(time)
      private val inputStreams = new ArrayBuffer[InputDStream[_]]()
      private val outputStreams = new ArrayBuffer[DStream[_]]()
      def generateJobs(time: Time): Seq[Job] = {
         logDebug("Generating jobs for time " + time)
         val jobs = this.synchronized {
           outputStreams.flatMap { outputStream =>
             val jobOption = outputStream.generateJob(time)
         logDebug("Generated " + jobs.length + " jobs for time " + time)
  • outputStream.generateJob定义了jobFunc,生成new Job(time, jobFunc)
       private[streaming] def generateJob(time: Time): Option[Job] = {
         getOrCompute(time) match {
           case Some(rdd) =>
             val jobFunc = () => {
               val emptyFunc = { (iterator: Iterator[T]) => {} }
               context.sparkContext.runJob(rdd, emptyFunc)
             Some(new Job(time, jobFunc))
           case None => None

2.4 第三步: jobScheduler.inputInfoTracker.getInfo(time)

  • 就是为了对应批次Block的元数据信息

    // Map to track all the InputInfo related to specific batch time and input stream.
    private val batchTimeToInputInfos =
    new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]

     case class StreamInputInfo(
         inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) 

2.5 第四步: jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  • JobGenerator 持有JobScheduler的引用,最终会提交Job的并开始驱动Executor计算。
    def submitJobSet(jobSet: JobSet) {
        if ( {
          logInfo("No jobs added for time " + jobSet.time)
        } else {

          jobSets.put(jobSet.time, jobSet)
 => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)

    3 总结


秦凯新 于深圳 2018