Spark 内核解析

该篇我们将会从如下几个方向进行讲解Spark的内核机制:

    • Spark的通信架构

    • Spark的脚本解析

    • Spark的交互流程

    • Spark的Shuffle过程

    • Spark的内存管理

    • Spark的部署模式

 

1.Spark 通信架构

    Spark作为分布式计算框架,多个节点的设计与相互通信模式是其重要的组成部分。Spark一开始使用 Akka 作为内部通信部件。在Spark 1.3年代,为了解决大块数据(如Shuffle)的传输问题,Spark引入了Netty通信框架。到了 Spark 1.6, Spark可以配置使用 Akka 或者 Netty 了,这意味着 Netty 可以完全替代 Akka了。再到 Spark 2, Spark 已经完全抛弃 Akka了,全部使用Netty了。

    为什么呢?官方的解释是:

    1) 很多Spark用户也使用Akka,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全一样的Akka版本,导致用户无法升级Akka。

    2) Spark的Akka配置是针对Spark自身来调优的,可能跟用户自己代码中的Akka配置冲突。

    3) Spark用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说少很多,debug比较容易。如果遇到什么bug,也可以自己马上fix,不需要等Akka上游发布新版本。而且,Spark升级Akka本身又因为第一点会强制要求用户升级他们使用的Akka,对于某些用户来说是不现实的。

1.1 通信组件概览

    对源码分析,对于设计思路理解如下:

Spark 内核解析

    1)  RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher

    2)  RpcEnv:RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv

    3)  Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱

    4)  Inbox:指令消息收件箱,一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费

    5)  OutBox:指令消息发件箱,一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去。消息放入发件箱以及发送过程是在同一个线程中进行,这样做的主要原因是远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送且需要得到结果进行处理

    6)  TransportClient:Netty通信客户端,根据OutBox消息的receiver信息,请求对应远程TransportServer

    7)  TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱

注意:

    TransportClient与TransportServer通信虚线表示两个RpcEnv之间的通信,图示没有单独表达式

    一个Outbox一个TransportClient,图示没有单独表达式

    一个RpcEnv中存在两个RpcEndpoint,一个代表本身启动的RPC端点,另外一个为 RpcEndpointVerifier

 

2.Spark 脚本解析

Spark 内核解析

 

3.Spark 交互流程(Standalone模式讲解)

3.1 Spark的交互流程,节点启动流程:

Spark 内核解析

    1)  Master启动时首先创一个RpcEnv对象,负责管理所有通信逻辑

    2)  Master通过RpcEnv对象创建一个Endpoint,Master就是一个Endpoint,Worker可以与其进行通信

    3)  Worker启动时也是创一个RpcEnv对象

    4)  Worker通过RpcEnv对象创建一个Endpoint

    5)  Worker通过RpcEnv对,建立到Master的连接,获取到一个RpcEndpointRef对象,通过该对象可以与Master通信

    6)  Worker向Master注册,注册内容包括主机名、端口、CPU Core数量、内存数量

    7)  Master接收到Worker的注册,将注册信息维护在内存中的Table中,其中还包含了一个到Worker的RpcEndpointRef对象引用

    8)  Master回复Worker已经接收到注册,告知Worker已经注册成功

    9)  此时如果有用户提交Spark程序,Master需要协调启动Driver;而Worker端收到成功注册响应后,开始周期性向Master发送心跳

3.2 核心组件交互流程

    在Standalone模式下,Spark中各个组件之间交互还是比较复杂的,但是对于一个通用的分布式计算系统来说,这些都是非常重要而且比较基础的交互。首先,为了理解组件之间的主要交互流程,我们给出一些基本要点:

    一个Application会启动一个Driver

    一个Driver负责跟踪管理该Application运行过程中所有的资源状态和任务状态

    一个Driver会管理一组Executor

    一个Executor只执行属于一个Driver的Task

    核心组件之间的主要交互流程,如下图所示:

Spark 内核解析

    上图中,通过不同颜色或类型的线条,给出了如下6个核心的交互流程,我们会详细说明:

橙色:提交用户Spark程序

    用户提交一个Spark程序,主要的流程如下所示:

     1)  用户spark-submit脚本提交一个Spark程序,会创建一个ClientEndpoint对象,该对象负责与Master通信交互

    2)  ClientEndpoint向Master发送一个RequestSubmitDriver消息,表示提交用户程序

    3)  Master收到RequestSubmitDriver消息,向ClientEndpoint回复SubmitDriverResponse,表示用户程序已经完成注册

    4)  ClientEndpoint向Master发送RequestDriverStatus消息,请求Driver状态   

     5)  如果当前用户程序对应的Driver已经启动,则ClientEndpoint直接退出,完成提交用户程序

紫色:启动Driver进程

    当用户提交用户Spark程序后,需要启动Driver来处理用户程序的计算逻辑,完成计算任务,这时Master协调需要启动一个Driver,具体流程如下所示:

    1)  Maser内存中维护着用户提交计算的任务Application,每次内存结构变更都会触发调度,向Worker发送LaunchDriver请求

    2)  Worker收到LaunchDriver消息,会启动一个DriverRunner线程去执行LaunchDriver的任务

    3)  DriverRunner线程在Worker上启动一个新的JVM实例,该JVM实例内运行一个Driver进程,该Driver会创建SparkContext对象

红色:注册Application

    Dirver启动以后,它会创建SparkContext对象,初始化计算过程中必需的基本组件,并向Master注册Application,流程描述如下:

    1)  创建SparkEnv对象,创建并管理一些基本组件

    2)  创建TaskScheduler,负责Task调度

    3)  创建StandaloneSchedulerBackend,负责与ClusterManager进行资源协商

    4)  创建DriverEndpoint,其它组件可以与Driver进行通信

    5)  在StandaloneSchedulerBackend内部创建一个StandaloneAppClient,负责处理与Master的通信交互

    6)  StandaloneAppClient创建一个ClientEndpoint,实际负责与Master通信

    7)  ClientEndpoint向Master发送RegisterApplication消息,注册Application

    8)  Master收到RegisterApplication请求后,回复ClientEndpoint一个RegisteredApplication消息,表示已经注册成功

蓝色:启动Executor进程

    1)  Master向Worker发送LaunchExecutor消息,请求启动Executor;同时Master会向Driver发送ExecutorAdded消息,表示Master已经新增了一个Executor(此时还未启动)

    2)  Worker收到LaunchExecutor消息,会启动一个ExecutorRunner线程去执行LaunchExecutor的任务

    3)  Worker向Master发送ExecutorStageChanged消息,通知Executor状态已发生变化

    4)  Master向Driver发送ExecutorUpdated消息,此时Executor已经启动

粉色:启动Task执行

    1)  StandaloneSchedulerBackend启动一个DriverEndpoint

    2)  DriverEndpoint启动后,会周期性地检查Driver维护的Executor的状态,如果有空闲的Executor便会调度任务执行

    3)  DriverEndpoint向TaskScheduler发送Resource Offer请求

    4)  如果有可用资源启动Task,则DriverEndpoint向Executor发送LaunchTask请求

    5)  Executor进程内部的CoarseGrainedExecutorBackend调用内部的Executor线程的launchTask方法启动Task

    6)  Executor线程内部维护一个线程池,创建一个TaskRunner线程并提交到线程池执行

绿色:Task运行完成

    1)  Executor进程内部的Executor线程通知CoarseGrainedExecutorBackend,Task运行完成

    2)  CoarseGrainedExecutorBackend向DriverEndpoint发送StatusUpdated消息,通知Driver运行的Task状态发生变更

    3)  StandaloneSchedulerBackend调用TaskScheduler的updateStatus方法更新Task状态

    4)  StandaloneSchedulerBackend继续调用TaskScheduler的resourceOffers方法,调度其他任务运行

 

(5)整体应用

    用户通过spark-submit提交或者运行spark-shell REPL,集群创建Driver,Driver加载Application,最后Application根据用户代码转化为RDD,RDD分解为Tasks,Executor执行Task等系列知识,整体交互蓝图如下:

Spark 内核解析

1)    Client运行时向Master发送启动驱动申请(发送RequestSubmitDriver指令)

2)    Master调度可用Worker资源进行驱动安装(发送LaunchDriver指令)

3)    Worker运行DriverRunner进行驱动加载,并向Master发送应用注册请求(发送RegisterApplication指令)

4)    Master调度可用Worker资源进行应用的Executor安装(发送LaunchExecutor指令)

5)    Executor安装完毕后向Driver注册驱动可用Executor资源(发送RegisterExecutor指令)

6)    最后是运行用户代码时,通过DAGScheduler,TaskScheduler封装为可以执行的TaskSetManager对象

7)    TaskSetManager对象与Driver中的Executor资源进行匹配,在队形的Executor中发布任务(发送LaunchTask指令)

8)    TaskRunner执行完毕后,调用DriverRunner提交给DAGScheduler,循环7.直到任务完成

 

 

 

9.Spark Shuffle过程

MapReduce的Shuffle过程介绍

    Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

    为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。

    从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示:

Spark 内核解析

      1.    Spill过程

Spill过程包括输出、排序、溢写、合并等步骤,如图所示:

Spark 内核解析