Spark Driver的启动
1,简介
Driver 是什么,看一下官方给的解释: The process running the main() function of the application and creating the SparkContext。
意思是运行应用程序的main函数并且创建SparkContext的进程。这里的应用程序就是我们自己编写并提交给Spark集群的程序。
上图是Spark程序运行的框架图,总体概括应该这样:首先启动Driver 程序,创建SparkContext程序,然后和ClusterManager通信,ClusterManager根据程序的逻辑,在相应的Worker上启动Executor,最后 Driver 和Executor通信,把任务分发到Executor进行运行。中间还有很多细节,比如任务的调度,DAGScheduler,Shuffle环节等等。后面会做相应的介绍。本篇博客只介绍Driver的启动,源码基于spark-2.4.0版本。
2,Driver的启动流程
在上一篇博客中,介绍了Spark Submit的任务提交,采用了传统网关的提交,创建ClientAPP,在Client的onStart方法里面,
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}
上面的onStart方法里,首先是创建driverDescription,然后向Master发送提交Driver的消息。也就是在我们提交程序后,创建的Client会向master发送要启动Driver这样的一个消息。下面就是Master接收到消息后进行相应的处理。下面进入到Master:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
//master接收到消息后进行模式匹配
case RequestSubmitDriver(description) =>
//首先判断master的状态是否是ALIVE,如果不是,则向
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
//如果master的状态不是alive,则发送失败的消息
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
//根据driverDescription的信息,创建driver
val driver = createDriver(description)
//把Driver的信息进行持久化
persistenceEngine.addDriver(driver)
//把Driver添加到等待的队列中
waitingDrivers += driver
//将Driver添加到Hashset中
drivers.add(driver)
//进行调度
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
Master 接收到client发送的提交Driver的消息后,首先就会创建一个Driver,然后把创建的Driver加入到等待队列,等待后续的调度执行。下面看一下Driver的创建:
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
//创建Date
val date = new Date(now)
//把Driver的信息封装为一个DriverInfo的对象
new DriverInfo(now, newDriverId(date), desc, date)
}
Driver创建完成后,就会把这些信息添加到队列中去。最后执行调度,下面看一下调度方法,sheduler:
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
//把集群上的处于Alive状态的worker随机打乱,放到放到shuffleAliveWorkers中
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
//统计有多少个Alive状态的worker
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
//遍历Driver
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
//用于统计已经访问的worker数量
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
//取出shuffledAliveWorkers中第一个worker,
val worker = shuffledAliveWorkers(curPos)
//访问的worker数量加1
numWorkersVisited += 1
//如果这个worke的资源满足Driver的需求
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//那么就在这个worker上启动Driver
launchDriver(worker, driver)
//Driver的等待队列中把这个启动的driver移除
waitingDrivers -= driver
//Driver的启动状态标记为是
launched = true
}
//用于遍历下一个worker的参数
curPos = (curPos + 1) % numWorkersAlive
}
}
//在worker上启动Executor
startExecutorsOnWorkers()
}
在执行调度时,会把集群中的worker随机打乱,放到一个数组中,然后遍历这个数组中的worker,如果在这个过程中,worker上的资源能够满足Driver的需求,就在这个worker上启动Driver。下面看一下,Driver的启动,进入launchDriver方法中:
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
//把Driver的信息添加到worker中
worker.addDriver(driver)
//把worker的信息添加到Driver信息里面
driver.worker = Some(worker)
//向相应的worker发送LaunchDriver的信息
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
//把Driver的状态标记为RUNNING
driver.state = DriverState.RUNNING
}
把worker的信息添加到Driver后,就向相应的worker发送启动Driver的消息,worker接收到消息后,就会执行启动Driver的程序,下面看一下worker接收到消息后,是怎么进行启动Driver的,进入到Worker中,
//worker的receive方法中根据模式匹配进入下面的代码
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
//把Driver的信息封装一个DriverRunner对象
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
//创建DriverId
drivers(driverId) = driver
//启动Driver
driver.start()
//更新该worker上用掉的cores数
coresUsed += driverDesc.cores
//更新worker上用掉的内存
memoryUsed += driverDesc.mem
封装好Driver对象后,调用start方法启动Driver
/** Starts a thread to run and manage the driver. */
private[worker] def start() = {
//创建一个新的线程启动Driver
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}
// prepare driver jars and run driver
//获取退出码,根据退出码反应Driver的状态
val exitCode = prepareAndRunDriver()
// set final state depending on if forcibly killed and process exit code
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}
// notify worker of final driver state, possible exception
//向worker发送Driver的状态
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
下面进入到prepareAndRunDriver的方法中:
private[worker] def prepareAndRunDriver(): Int = {
//创建Driver的工作目录
val driverDir = createWorkingDirectory()
//下载Jar包到该工作目录
val localJarFilename = downloadUserJar(driverDir)
//根据参数,匹配相应的模式
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// TODO: If we add ability to submit multiple jars they should also be added here
//根据参数创建一个ProcessBuilder,启动Driver的执行命令
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
//执行命令启动Driver
runDriver(builder, driverDir, driverDesc.supervise)
}
上面代码主要是准备Driver的运行环境,创建启动Driver的执行命令,最后调用runDriver方法,进入到这个方法:
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
//设置Driver的工作目录
builder.directory(baseDir)
def initialize(process: Process): Unit = {
// Redirect stdout and stderr to files
//创建stdout文件,把InputStream重定向到stdout文件
val stdout = new File(baseDir, "stdout")
CommandUtils.redirectStream(process.getInputStream, stdout)
//创建stderr文件,为后面保存出现错误信息的日志做准备
val stderr = new File(baseDir, "stderr")
//格式化builder命令
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
//将出现的错误信息重新定向到stderr文件
Files.append(header, stderr, StandardCharsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}
上面的代码主要是创建一些保存日志的文件,最后调用runCommandWithRetry的方法:
private[worker] def runCommandWithRetry(
command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
//设置初始的退出码为-1
var exitCode = -1
// Time to wait between submission retries.
//设置重试的时间间隔
var waitSeconds = 1
// A run of this many seconds resets the exponential back-off.
val successfulRunDuration = 5
//keepTrying为true
var keepTrying = !killed
while (keepTrying) {
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
synchronized {
//如果是fasle,返回退出码
if (killed) { return exitCode }
//执行命令启动,这里其实才是真正启动命令来启动Driver
process = Some(command.start())
initialize(process.get)
}
val processStart = clock.getTimeMillis()
exitCode = process.get.waitFor()
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
sleeper.sleep(waitSeconds)
waitSeconds = waitSeconds * 2 // exponential back-off
}
}
//返回退出码
exitCode
}
}
再回到start的方法中,根据退出码,返回Driver是FINISHED 或者是KILLED 还是FAILED的状态。把返回的状态发送给Worker,下面看一下Worker接收到消息后的处理:
case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)
会调用handleDriverStateChanged的方法,进入到该方法:
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
//获取Driver的ID
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
//获取Driver的状态
val state = driverStateChanged.state
//根据状态,输出相应的日志信息
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
//向Master发送Driver的状态信息
sendToMaster(driverStateChanged)
//移除Driver
val driver = drivers.remove(driverId).get
//把Driver状态标记为完成
finishedDrivers(driverId) = driver
//如果需要则删除链表里面的处于finished状态的Driver
trimFinishedDriversIfNecessary()
//更新一下用掉的内存和cores数
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}
主要是worker向Master发送Driver状态改变的消息,master在接收到消息后进行相应的处理:
进入到Master中:
case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
//以上三种状态都会调用removeDriver的方法
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
Master接收到消息后,就会调用removeDriver的方法移除driver:
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//把Driver从队列中移除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
//把Driver添加到已完成completeDrivers的数组中
completedDrivers += driver
//持久化引擎中也把该Driver移除
persistenceEngine.removeDriver(driver)
//更新Driver的状态为最终的状态
driver.state = finalState
driver.exception = exception
//移除worker上的Driver
driver.worker.foreach(w => w.removeDriver(driver))
//最后在进行调度
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
以上就是整个Driver的启动,以及完成后被移除的过程,即整个生命周期。