Flink 1.9源码学习02 ----JobManager启动源码分析
在上一篇我们看到jobmanager的启动类org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
来看一下StandaloneSessionClusterEntrypoint启动类的一些重要的方法:
--->>>
我们先看main方法:
ClusterEntrypoint是一个抽象类,
--->>>
调用它的方法:
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); try { //在这里启动了集群 clusterEntrypoint.startCluster(); } catch (ClusterEntrypointException e) { LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> { final int returnCode; if (throwable != null) { returnCode = RUNTIME_FAILURE_RETURN_CODE; } else { returnCode = applicationStatus.processExitCode(); } LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable); System.exit(returnCode); }); }
--->>>
启动集群的具体方法:
public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { configureFileSystems(configuration); SecurityContext securityContext = installSecurityContext(configuration); securityContext.runSecured((Callable<Void>) () -> { //启动集群,传入配置文件参数 runCluster(configuration); return null; }); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); try { // clean up any partial state shutDownAsync( ApplicationStatus.FAILED, ExceptionUtils.stringifyException(strippedThrowable), false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { strippedThrowable.addSuppressed(e); } throw new ClusterEntrypointException( String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()), strippedThrowable); } }
--->>>
我们看看 runCluster这个方法做了啥子:
private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { //todo 初始化了一些ClusterEntrypoint.java中的一些服务,比如:HA,blob,heartbeat,metricRegistry这些 initializeServices(configuration); //todo 将host信息写入配置 configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); //todo 将工厂真正开启接口,其中包括了一些创建以及启动ResourceManager(有用于请求solt的RPC,初始化所有solt到resourceManager的soltManager的RPC(这个会在jobmanager接收到jobGraph后调用),TM心跳等),启动web服务 clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { //todo 出错了,关闭方法 if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { //todo 这是一般的关闭路径。如果是单独的更具体的关闭 //todo 已经触发,这将什么也不做 shutDownAsync( applicationStatus, null, true); } }); } }
--->>>
具体的ResourceManager的初始化:
点进去:
这是一个接口:
DispatcherResourceManagerComponentFactory
我们看到抽象类
AbstractDispatcherResourceManagerComponentFactory 实现了这个接口
创建resourceManager 用于接收slot的请求:
我们发现它是一个接口:
来看类:
package org.apache.flink.runtime.resourcemanager;
这是一个枚举类:
public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {
}
点进去看看发现这个类创建了一个继承了resourceManager 这个抽象类
Resourcemanager 这个抽象类:
它
继承ResourceIDRetrievable> 继承FencedRpcEndpoint<ResourceManagerId> 实现 ResourceManagerGateway, LeaderContender {
这个接口下的几个重要的RPC方法具体实现:
1)向resourceManager请求slot:
2)这个rpc想resourceManager发送包括像taskManagaer有多少可分配的solt,哪些已分配的solt,solt的状态等
--->>>
最后创建完毕之后调用启动方法:
调用的是rpcServer
--->>>
完事之后开始调度了:
--->>>
创建了一个Dispatcher调度对象
看下Dispatcher是用来干嘛的(StandaloneDispatcher都是调用了父类的初始化方法super()创建一个Dispatcher.java对象)
来看一下Dispatcher实现了什么接口(ResourceManager同理)
看它实现的接口:
里面有提交job:
ok,我们再返回去找到具体的实现方法:
点进去:
最后:
他实现了submitJob()接口用于启动一个RPC,接受参数可以看到接受到一个JobGraph,这就意味着这和job任务启动有关,后面随缘更新到job启动Graph转换会提到
回到前面的Dispatcher.start()将传入的rpcService启动起来了,等待接受来自Driver端提交上来的JobGraph差不多启动完成了