Master主备切换机制
目录
3.Master.scala => completeRecovery()分析
1.含义
实际指Master的切换,将出故障的 Active Master 替换为 StandBy Master
2.完成过程
(1)(抢救,复用之前的数据)使用持久化引擎读取持久化的storedApps,storeDrivers,storeWorkersFileSystemPersistenceEngine,ZookeeperPersistenceEngine
(2)(重新注册到Master)将读取的持久化信息中, Application,Driver,Worker 重新注册到Master内部的缓存结构
(3)(Master反向注册)在Master中将APplication,Worker的状态改为 UNKNOWN,重新向Application对应的Driver,Worker发送standby Master地址
(4)(检验Driver,Worker)如果该组件运行正常,会返回给Master消息,并且更改状态为 “非KNOWN”
(5)(Master过滤)调用completeRecovery()方法,对非正常运行的Driver,Worker进行移除
(6) (重新分配)调用schedule()方法
3.Master.scala => completeRecovery()分析
(1)判断Master的RecoveryState,是否为RECOVERING,后转化为COMPLETING_RECOVERY
(2)从workers中过滤出 UNKNOWN的,调用removeWorker()
(3)从apps中过滤出 UNKNOWN的,调用finishApplication()
(4)从 drivers中过滤出 当前driver.worker.isEmpty进行遍历
(5)将driver.desc.supervisev 调用 relaunchDriver()
(6)其余 removeDriver()
(7)调用scheduler
4.重点方法
(1)Master.removeWorker()
1>更新worker状态为 DEAD
2>遍历 worker中的executors,调用 removeExecutor()
3>遍历 worker中的 drivers:
将描述为 desc.supervise ,调用 relaunchDriver()
其余 removeDriver()
4>最后从固有的worker引擎 persistenceEngine 中调用 removeWorker()
(2)WorkerInfo.removeExecutor()
移除ID,移除占用的cpu cores
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
coresGranted -= exec.cores
}
(3)Master.relaunchDriver()
driver.worker = None
driver.state = DriverState.RELAUNCHING
waitingDrivers += driver
schedule()
(4)Master.removeDriver()
drivers.find(d => d.id == driverId) match
persistenceEngine.removeDriver(driver)
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
(5)Master.finishApplication()=》removeApplication(app, ApplicationState.FINISHED)
(6)Master.removeApplication()
if (apps.contains(app))
applicationMetricsSystem.removeSource(a.appSource)
exec.worker.removeExecutor(exec)
persistenceEngine.removeApplication(app)
schedule()
5.清理机制汇总
(1)从内存缓存结构中移除
(2)从相关组件的内存缓存中移除
(3)从持久化存储中移除