spark流程源码解析

这篇文章是编写的spark core流程笔记的入口

现在只是编写自己理解的笔记,后面会补上所有文中相关链接,

一,起源之地:sparkContext

首先了解spark程序:

  1. Spark 程序在运行的时候分为 Driver 和 Executor 两部分
  2. Spark 程序编写是基于 SparkContext 的,具体来说包含两方面
    • Spark 编程的核心 基础-RDD 是由SparkContext 来最初创建的(第一个RDD一定是由 SparkContext来创建的)
    • Spark 程序的调度优化也是基于 SparkContext,首先进行调度优化。
  3. Spark程序的注册时通过 SparkContext 实例化时候生产的对象来完成的(其实是 SchedulerBackend 来注册程序)
  4. Spark 程序在运行的时候要通过 Cluster Manager 获取具体的计算资源,计算资源获取也是通过 SparkContext 产生的对象来申请的(其实是 SchedulerBackend 来获取计算资源的)
  5. 整个SparkContext 崩溃或者结束的时候整个Spark 程序也结束啦!

SparkContext 构建的对象

三大*核心:DAGScheduler, TaskScheduler, SchedulerBackend

  1. DAGScheduler 是面向 Job 的 Stage 的高層調度器;
  2. TaskScheduler 是一個接口,是低層調度器,根據具體的 ClusterManager
    的不同會有不同的實現,Standalone 模式下具體的實現 TaskSchedulerImpl;
  3. SchedulerBackend 是一個接口,根據具體的 ClusterManager 的不同會有不同的實現,Standalone模式下具體的實現是SparkDeploySchedulerBackend

從整個程序運行的角度來講,SparkContext 包含四大核心對象
DAGScheduler, TaskScheduler, SchedulerBackend, MapOutputTrackerMaster

创建spark core运行环境sparkEnv

sparkEnv概述

sparkEnv是spark的执行环境,其中包括众多与Executor执行相关的对象。在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中
spark流程源码解析
其实在sparkContext初始化的时候,创建的是DriverEnv环境
conf:sparkConf,spark的环境配置。
isLocal:模式判断。
listenerBus:事件监听总线。
SparkContext.numDriverCores(master):Driver的核数。
然后跳进方法体create(),dirver和executor都是调用的这个创建方法。

具体步骤

sparkContext核心对象的创建

spark流程源码解析

  1. 核心对象创建入口是sparkContext中的createTaskScheduler()方法
  createTaskScheduler(
          sc: SparkContext,
          master: String,
          deployMode: String): (SchedulerBackend, TaskScheduler) = {
        import SparkMasterRegex._
  1. 根据master的状态来进行创建对象,这里是spark standAlone模式入口,**注意这几行代码,后面都是这几行代码的细化,**这里可以看出createTaskScheduler()方法返回的TaskSchedulerImpl和 StandaloneSchedulerBackend
 case SPARK_REGEX(sparkUrl) =>
       val scheduler = new TaskSchedulerImpl(sc)     (步骤3)
       val masterUrls = sparkUrl.split(",").map("spark://" + _)(步骤4)
       val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)(步骤5-10)
       scheduler.initialize(backend)(步骤11-12)
       (backend, scheduler)
  1. 根据代码就知道是先创建TaskSchedulerImpl,将sparkContext传入实例化,但是并没有进行初始化,

  2. 匹配集群中 master 的地址 e.g. spark://

  3. 然后创建StandaloneSchedulerBackend对象(注意不是SparkDeploySchedulerBackend,改名了),进入后发现
    private[spark] class StandaloneSchedulerBackend(`private[spark] class StandaloneSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String]) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with StandaloneAppClientListener with Logging {

  4. StandaloneSchedulerBackend是CoarseGrainedSchedulerBackend的子类,调用StandaloneSchedulerBackend的start方法是调用父类CoarseGrainedSchedulerBackend的start方法,

override def start() {
    super.start()
  1. 然后创建一个很重要的对象StandaloneAppClient(就是APPClient),然后调用它的 start( ) 方法,创建一个 ClientEndpoint 对象。
 def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }
  1. ClientEndpoint对象是一个 RpcEndPoint,然后接下来的故事就是向 Master 注冊,首先实例化后调用自己的 onStart 方法
    RpcEndpoint的声明周期==constructor -> onStart -> receive -> onStop*

在实例化的时候根据Spark的RPC的消息工作机制会调用生命周期方法onStart方法,在该方法执行时会执行Option(self).foreach(_.send(ReviveOffers))来周期性地发ReviveOffers消息给自己,ReviveOffers是个空的object,会触发makeOffers来‘Make fake resource offers on all executors’。

work接收消息运行机制

开始创建的时候是发送的空的,这是在等待执行具体的task的时候用的。

 override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }
  1. 然后再调用 registerWithMaster 方法,从 registerWithMaster 调用 tryRegisterAllMasters,开一条新的线程(利用线程池registerMasterThreadPool)来注冊,然后发送一条信息(RegisterApplication 的case class ) 给 Master,注冊是通过 Thread 来完成的。Master 收到了这个信息便开始注冊,注冊后最后再次调用 schedule( ) 方法 master的作用点击

    通过StandAloneSchedulerBackend 注册到Master 的时候会将以上的 command 提交给 Master ,请注意org.apache.spark.executor.CoarseGrainedExecutorBackend,将来会通过这个启动启动执行的executor。
    master发指令给worker去启动Executor所有的进程的时候加载的main方法所在的入口类就是coommand中的CoarseGrainedExecutorBackend,当然你可以实现自己的ExecutorBackend,在CoarseGrainnedExecutorBackend中启动Executor(Executor是先注册在实例化),Executor通过线程值并发执行Task。

/**
     * Register with all masters asynchronously. It will call `registerWithMaster` every
     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
     *
     * nthRetry means this is the nth attempt to register with master.
     */
 private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }
  1. 这里代码会利用future类等待注册启动executor,这时候设置SparkApp为运行状态,然后开始TaskSchedulerImpl的初始化,调用initialize( )方法
 client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  1. 进入initialize()方法,创建资源配置池和资源调度算法,同时通过SchdulableBuilder.addTaskSetmanager:SchdulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager来确定每个Task具体运行在哪个ExecutorBackend中。
def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

12.然后返回TaskSchedulerImpl对象和StandaloneSchedulerBackend对象,然后说下DAGScheduler和taskScheduler对象,在返回对象的时候DAGscheduler(初始为null)是包含在TaskSchedulerImpl里面的,而TaskSchedulerImpl其实就是TaskScheduler,里面也包含TaskSetManage。

总结

总的来说sparkContext就是把在程序运行过程中需要的东西都初始化出来,包括环境,管理等,在后面再进行操作,sparkContext的流程其实就是程序运行的大概流程,就只剩下一些细节方面的知识