Flink 1.9源码学习02 ----JobManager启动源码分析

在上一篇我们看到jobmanager的启动类org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

来看一下StandaloneSessionClusterEntrypoint启动类的一些重要的方法:

Flink 1.9源码学习02 ----JobManager启动源码分析

--->>>

 我们先看main方法:

Flink 1.9源码学习02 ----JobManager启动源码分析

ClusterEntrypoint是一个抽象类,

Flink 1.9源码学习02 ----JobManager启动源码分析

--->>>

调用它的方法:

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

   final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
   try {
     //在这里启动了集群
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }

   clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
      final int returnCode;

      if (throwable != null) {
         returnCode = RUNTIME_FAILURE_RETURN_CODE;
      } else {
         returnCode = applicationStatus.processExitCode();
      }

      LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
      System.exit(returnCode);
   });
}

--->>>

启动集群的具体方法:

public void startCluster() throws ClusterEntrypointException {
   LOG.info("Starting {}.", getClass().getSimpleName());

   try {

      configureFileSystems(configuration);

      SecurityContext securityContext = installSecurityContext(configuration);

      securityContext.runSecured((Callable<Void>) () -> {
         //启动集群,传入配置文件参数
         runCluster(configuration);

         return null;
      });
   } catch (Throwable t) {
      final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

      try {
         // clean up any partial state
         shutDownAsync(
            ApplicationStatus.FAILED,
            ExceptionUtils.stringifyException(strippedThrowable),
            false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
      } catch (InterruptedException | ExecutionException | TimeoutException e) {
         strippedThrowable.addSuppressed(e);
      }

      throw new ClusterEntrypointException(
         String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
         strippedThrowable);
   }
}

--->>>

我们看看 runCluster这个方法做了啥子:

private void runCluster(Configuration configuration) throws Exception {
   synchronized (lock) {

      //todo 初始化了一些ClusterEntrypoint.java中的一些服务,比如:HA,blob,heartbeat,metricRegistry这些
      initializeServices(configuration);

      //todo 将host信息写入配置
      configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
      configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

      final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

      //todo 将工厂真正开启接口,其中包括了一些创建以及启动ResourceManager(有用于请求solt的RPC,初始化所有solt到resourceManager的soltManager的RPC(这个会在jobmanager接收到jobGraph后调用),TM心跳等),启动web服务
      clusterComponent = dispatcherResourceManagerComponentFactory.create(
         configuration,
         commonRpcService,
         haServices,
         blobServer,
         heartbeatServices,
         metricRegistry,
         archivedExecutionGraphStore,
         new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
         this);

      clusterComponent.getShutDownFuture().whenComplete(
         (ApplicationStatus applicationStatus, Throwable throwable) -> {
            //todo 出错了,关闭方法
            if (throwable != null) {
               shutDownAsync(
                  ApplicationStatus.UNKNOWN,
                  ExceptionUtils.stringifyException(throwable),
                  false);
            } else {
               //todo 这是一般的关闭路径。如果是单独的更具体的关闭
               //todo 已经触发,这将什么也不做
               shutDownAsync(
                  applicationStatus,
                  null,
                  true);
            }
         });
   }
}

--->>>

具体的ResourceManager的初始化:

点进去:

Flink 1.9源码学习02 ----JobManager启动源码分析

这是一个接口:

DispatcherResourceManagerComponentFactory

Flink 1.9源码学习02 ----JobManager启动源码分析

我们看到抽象类

AbstractDispatcherResourceManagerComponentFactory 实现了这个接口

Flink 1.9源码学习02 ----JobManager启动源码分析

创建resourceManager 用于接收slot的请求:

Flink 1.9源码学习02 ----JobManager启动源码分析

我们发现它是一个接口:

Flink 1.9源码学习02 ----JobManager启动源码分析

来看类:

package org.apache.flink.runtime.resourcemanager;

这是一个枚举类:

public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {

}

Flink 1.9源码学习02 ----JobManager启动源码分析

Flink 1.9源码学习02 ----JobManager启动源码分析

点进去看看发现这个类创建了一个继承了resourceManager 这个抽象类

Flink 1.9源码学习02 ----JobManager启动源码分析

Resourcemanager 这个抽象类:

继承ResourceIDRetrievable>
      继承FencedRpcEndpoint<ResourceManagerId>
      实现 ResourceManagerGateway, LeaderContender {

Flink 1.9源码学习02 ----JobManager启动源码分析

 

这个接口下的几个重要的RPC方法具体实现:

1)向resourceManager请求slot:

Flink 1.9源码学习02 ----JobManager启动源码分析

2)这个rpc想resourceManager发送包括像taskManagaer有多少可分配的solt,哪些已分配的solt,solt的状态等

Flink 1.9源码学习02 ----JobManager启动源码分析

--->>>

最后创建完毕之后调用启动方法:

Flink 1.9源码学习02 ----JobManager启动源码分析

调用的是rpcServer

Flink 1.9源码学习02 ----JobManager启动源码分析

--->>>

完事之后开始调度了:

Flink 1.9源码学习02 ----JobManager启动源码分析

 

--->>>

创建了一个Dispatcher调度对象

看下Dispatcher是用来干嘛的(StandaloneDispatcher都是调用了父类的初始化方法super()创建一个Dispatcher.java对象)

来看一下Dispatcher实现了什么接口(ResourceManager同理)

Flink 1.9源码学习02 ----JobManager启动源码分析

看它实现的接口:

里面有提交job:

Flink 1.9源码学习02 ----JobManager启动源码分析

ok,我们再返回去找到具体的实现方法:

 

Flink 1.9源码学习02 ----JobManager启动源码分析

点进去:

Flink 1.9源码学习02 ----JobManager启动源码分析

 

最后:

他实现了submitJob()接口用于启动一个RPC,接受参数可以看到接受到一个JobGraph,这就意味着这和job任务启动有关,后面随缘更新到job启动Graph转换会提到

回到前面的Dispatcher.start()将传入的rpcService启动起来了,等待接受来自Driver端提交上来的JobGraph差不多启动完成了

参考:https://www.cnblogs.com/ljygz/p/11405572.html