1.SparkContext源码分析

先来章整个SparkContext的架构图:

1.SparkContext源码分析


1.通过SparkContext的createTaskScheduler获取TaskSchedulerImpl和SparkDeploySchedulerBackend
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)

  1. createTaskScheduler方法中有如下代码:
  2. //这就是常用的standalone模式 , 仔细分析下面这段代码
  3. case SPARK_REGEX(sparkUrl) =>
  4. //创建TaskScheduler对象
  5. val scheduler = new TaskSchedulerImpl(sc)
  6. //获取spark master地址
  7. val masterUrls = sparkUrl.split(",").map("spark://" + _)
  8. //创建SparkDeploySchedulerBackend对象 , 这里以scheduler作为参数 , 让TaskScheduler和SparkDeploySchedulerBackend相互依赖
  9. val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
  10. //让scheduler调度器初始化 , 重点分析
  11. scheduler.initialize(backend)
  12. //将这两个对象作为tuple返回
  13. (backend, scheduler)

  现在进入到  scheduler.initialize(backend)方法中 :
   

  1. //TaskScheduler初始化方法 ,
  2. def initialize(backend: SchedulerBackend) {
  3. this.backend = backend
  4. // temporarily set rootPool name to empty
  5. rootPool = new Pool("", schedulingMode, 0, 0)
  6. //根据调度模式创建调度对象
  7. schedulableBuilder = {
  8. schedulingMode match {
  9. case SchedulingMode.FIFO => //先进先出调度模式
  10. new FIFOSchedulableBuilder(rootPool)
  11. case SchedulingMode.FAIR => //FAIR调度模式
  12. new FairSchedulableBuilder(rootPool, conf)
  13. }
  14. }
  15. //创建调度池
  16. schedulableBuilder.buildPools()
  17. }

  在上面的操作完了之后进入到TaskScheduler的start方法 :

  //这里的start方法内部就会去启动SparkDeploySchedulerBackend的start方法了
  

  1. taskScheduler.start()

  SparkDeploySchedulerBackend的start方法代码如下:
   //这里就是SparkDeploySchedulerBackend的启动方法了 , 里面封装了我们编写程序的参数配置信息ApplicationDescription
  

  1. override def start() {
  2. super.start()
  3. // The endpoint for executors to talk to us
  4. val driverUrl = AkkaUtils.address(
  5. AkkaUtils.protocol(actorSystem),
  6. SparkEnv.driverActorSystemName,
  7. conf.get("spark.driver.host"),
  8. conf.get("spark.driver.port"),
  9. CoarseGrainedSchedulerBackend.ACTOR_NAME)


  代码太多 , 就不全部贴完了 , 这个方法最会构造一个ApplicationDescription对象 , 该封装了一个Application的所有信息 , 包括maxCores,name等等
  

  1. val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
  2. appUIAddress, sc.eventLogDir, sc.eventLogCodec)


  接下来就是AppClient了的分析了 , AppClient是一个接口 , 通过接口中的内部类ClientActor负责为application与spark进行通信  ,它会拿到master的actor引用与master通信                                         
  接收master的url , APPlicationDescription和集群事件的监听器以及各种事件发生时的监听器


  1. class ClientActor extends Actor with ActorLogReceive with Logging {
  2. var master: ActorSelection = null
  3. var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
  4. var alreadyDead = false // To avoid calling listener.dead() multiple times
  5. var registrationRetryTimer: Option[Cancellable] = None
  6. override def preStart() {
  7. context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  8. try {
  9. //向master进行注册
  10. registerWithMaster()
  11. } catch {
  12. case e: Exception =>
  13. logWarning("Failed to connect to master", e)
  14. markDisconnected()
  15. context.stop(self)
  16. }
  17. }
  18. def tryRegisterAllMasters() {
  19. for (masterAkkaUrl <- masterAkkaUrls) {
  20. logInfo("Connecting to master " + masterAkkaUrl + "...")
  21. val actor = context.actorSelection(masterAkkaUrl)
  22. actor ! RegisterApplication(appDescription)
  23. }
  24. }
  25. //向所有的master进行注册
  26. def registerWithMaster() {
  27. tryRegisterAllMasters()
  28. import context.dispatcher
  29. var retries = 0
  30. registrationRetryTimer = Some {
  31. context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
  32. Utils.tryOrExit {
  33. retries += 1
  34. if (registered) {
  35. registrationRetryTimer.foreach(_.cancel())
  36. } else if (retries >= REGISTRATION_RETRIES) {
  37. markDead("All masters are unresponsive! Giving up.")
  38. } else {
  39. tryRegisterAllMasters()
  40. }
  41. }
  42. }
  43. }
  44. }

  TaskScheduler创建完之后就是DAGScheduler了 , 它是SparkContext中的重要组件,启动代码如下 :
    

  1. //@volatile 表示DAGScheduler可以被多个线程同时更新
  2. // 这里创建DAGScheduler
  3. @volatile private[spark] var dagScheduler: DAGScheduler = _
  4. try {
  5. dagScheduler = new DAGScheduler(this)
  6. } catch {
  7. case e: Exception => {
  8. try {
  9. stop()
  10. } finally {
  11. throw new SparkException("Error while constructing DAGScheduler", e)
  12. }
  13. }
  14. }


  DAGScheduler实现了面向stage的调度 , 为每一个job计算一个stage的DAG(有向无环图) , 追踪RDD和stage的输出是否被写入磁盘或者是内存 , 同时寻找最优的调度机制来运行job
  DAGScheduler将stage最为tasksets提交到TaskSchedulerImpl上 , 好让集群来运行这些task
  除此之外还负责找到每个task的最佳位置(后面解释) , 根据当前task的状态将最佳位置交给TaskSchedulerImpl , 同时呢处理shuffle输出文件丢失导致的失败 ,旧的stage可能就会被重新提交
  一个stage的内部失败若不是shuffle文件丢失导致的那么会被TaskScheduler处理 , 并多次重试每一个task知道最后实在不行了才会去取消整个stage . 


  1. class DAGScheduler(
  2. private[scheduler] val sc: SparkContext,
  3. private[scheduler] val taskScheduler: TaskScheduler,
  4. listenerBus: LiveListenerBus,
  5. mapOutputTracker: MapOutputTrackerMaster,
  6. blockManagerMaster: BlockManagerMaster,
  7. env: SparkEnv,
  8. clock: Clock = new SystemClock())
  9. extends Logging { ....

  从以上的源码可以总结出SparkContext的用处有以下几点 :
  1.通过底层的SparkDeploySchedulerBackend , 针对不同种类的集群模式(standalone,yarn,mesos) , 调度task
  2.通过使用一个LocalBackend , 并将isLocal参数设置为true,在本地模式下工作
  3.负责处理一些通用的逻辑 , 例如job的调度顺序 , 启动推测任务执行
  4.调用TaskScheduler的initialize方法和start方法 , 然后通过runTasks()方法提交task sets