1.SparkContext源码分析
先来章整个SparkContext的架构图:
1.通过SparkContext的createTaskScheduler获取TaskSchedulerImpl和SparkDeploySchedulerBackend
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
在createTaskScheduler方法中有如下代码:
//这就是常用的standalone模式 , 仔细分析下面这段代码
case SPARK_REGEX(sparkUrl) =>
//创建TaskScheduler对象
val scheduler = new TaskSchedulerImpl(sc)
//获取spark master地址
val masterUrls = sparkUrl.split(",").map("spark://" + _)
//创建SparkDeploySchedulerBackend对象 , 这里以scheduler作为参数 , 让TaskScheduler和SparkDeploySchedulerBackend相互依赖
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//让scheduler调度器初始化 , 重点分析
scheduler.initialize(backend)
//将这两个对象作为tuple返回
(backend, scheduler)
现在进入到 scheduler.initialize(backend)方法中 :
//TaskScheduler初始化方法 ,
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
//根据调度模式创建调度对象
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO => //先进先出调度模式
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR => //FAIR调度模式
new FairSchedulableBuilder(rootPool, conf)
}
}
//创建调度池
schedulableBuilder.buildPools()
}
在上面的操作完了之后进入到TaskScheduler的start方法 :
//这里的start方法内部就会去启动SparkDeploySchedulerBackend的start方法了
taskScheduler.start()
SparkDeploySchedulerBackend的start方法代码如下:
//这里就是SparkDeploySchedulerBackend的启动方法了 , 里面封装了我们编写程序的参数配置信息ApplicationDescription
override def start() {
super.start()
// The endpoint for executors to talk to us
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
代码太多 , 就不全部贴完了 , 这个方法最会构造一个ApplicationDescription对象 , 该封装了一个Application的所有信息 , 包括maxCores,name等等
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
接下来就是AppClient了的分析了 , AppClient是一个接口 , 通过接口中的内部类ClientActor负责为application与spark进行通信 ,它会拿到master的actor引用与master通信
接收master的url , APPlicationDescription和集群事件的监听器以及各种事件发生时的监听器
class ClientActor extends Actor with ActorLogReceive with Logging {
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
var registrationRetryTimer: Option[Cancellable] = None
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
//向master进行注册
registerWithMaster()
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}
//向所有的master进行注册
def registerWithMaster() {
tryRegisterAllMasters()
import context.dispatcher
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
}
TaskScheduler创建完之后就是DAGScheduler了 , 它是SparkContext中的重要组件,启动代码如下 :
//@volatile 表示DAGScheduler可以被多个线程同时更新
// 这里创建DAGScheduler
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}
DAGScheduler实现了面向stage的调度 , 为每一个job计算一个stage的DAG(有向无环图) , 追踪RDD和stage的输出是否被写入磁盘或者是内存 , 同时寻找最优的调度机制来运行job
DAGScheduler将stage最为tasksets提交到TaskSchedulerImpl上 , 好让集群来运行这些task
除此之外还负责找到每个task的最佳位置(后面解释) , 根据当前task的状态将最佳位置交给TaskSchedulerImpl , 同时呢处理shuffle输出文件丢失导致的失败 ,旧的stage可能就会被重新提交
一个stage的内部失败若不是shuffle文件丢失导致的那么会被TaskScheduler处理 , 并多次重试每一个task知道最后实在不行了才会去取消整个stage .
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging { ....
从以上的源码可以总结出SparkContext的用处有以下几点 :
1.通过底层的SparkDeploySchedulerBackend , 针对不同种类的集群模式(standalone,yarn,mesos) , 调度task
2.通过使用一个LocalBackend , 并将isLocal参数设置为true,在本地模式下工作
3.负责处理一些通用的逻辑 , 例如job的调度顺序 , 启动推测任务执行
4.调用TaskScheduler的initialize方法和start方法 , 然后通过runTasks()方法提交task sets