Spark Streaming运行架构以及代码详解

1. 运行架构
spark Streaming相对其他流处理系统最大的优势在于流处理引擎和数据处理在同一软件栈,其中Spark Streaming功能主要包括流处理引擎的流数据接收与存储以及批处理作业的生成与管理,而Spark Core负责处理Spark Streaming发送过来的作业。Spark Streaming分为Driver端和Client端,运行在Driver端为StreamingContext实例,该实例包括DStreamGraph和JobScheduler(包括ReceiverTracker和JobGenerator)等,而Client包括ReceiverSupervisor和Receiver等。
Spark Streaming进行流数据处理大致可以分为:启动流数据引擎、接收及存储流数据、处理流数据和输出处理结果等4个步骤。
Spark Streaming运行架构以及代码详解

2. Spark Streaming各个组件
1.StreamingContext: Spark Streaming 中Driver端的上下文对象,初始化的时候会构造Spark Streaming应用程序需要使用的组件,比如DStreamGraph,JobScheduler等。
2.DStreamGraph:用于保存DStream和DStream之间依赖关系等信息。
3.JobScheduler: 主要用于调度job。JobScheduler主要通过JobGenerator产生job,并且通过ReceiverTracker管理流数据接收器Receiver。
4.JobGenerator: 主要是从DStream产生job, 且根据指定时间执行checkpoint. 他维护了一个定时器,该定时器在批处理时间到来的时候会进行生成作业的操作。
5.ReceiverTracker: 管理各个Executor上的Receiver的元数据。它在启动的时候,需要根据流数据接收器Receiver分发策略通知对应的Executor中的ReceiverSupervisor(接收器管理着)启动,然后再由ReceiverSupervisor来启动对应节点的Receiver
6.ReceiverTrackerEndpoint: ReceiverTracker用于通信的RPC终端。
7.Receiver:数据接收器,用于接收数据,通过ReceiverSupervisor将数据交给ReceiveBlockHandler来处理。
8.ReceiverSupervisor:主要用于管理各个worker节点上的Receivor,比如启动worker上的Receiver,或者是转存数据,交给ReceiveBlockHandler来处理;数据转存完毕,将数据存储的元信息汇报给ReceiverTracker,由它来负责管理收到的数据块元信息。
9.BlockGenerator: 这个类的主要作用是创建Receiver接收的数据的batches,然后根据时间间隔命名为合适的block. 并且把准备就绪的batches作为block 推送到BlockManager。
10.ReceiveBlockHandler:主要根据是否启用WAL预写日志的机制,区分为预写日志和非预写日志存储。非预写日志则是直接将数据通过BlockManager写入Worker的内存或者磁盘;而预写日志则是在预写日志的同时把数据写入Worker的内存或者磁盘。
11.ReceiverSchedulingPolicy: Receiver调度策略

3. 运行原理剖析

  1. 1、初始化StreamingContext对象,在该对象启动过程中实例化DStreamGraph 和 JobScheduler,其中DStreamGraph用于存放DStream以及DStram之间的依赖关系等信息;而JobScheduler中包括ReceiverTracker和JobSGenerator,其中ReceiverTracker为Driver端流数据接收器(Receiver)的管理者,JobGenerator为批处理作业生成器。在ReceiverTracker启动过程中,根据流数据接收器分发策略通知对应的Executor中的流数据接收管理器(ReceiverSupervisor)启动,再由ReceiverSupervisor启动流数据接收器。
  2. 2、当流数据接收器Receiver启动后,持续不断地接收实时流数据,根据传过来数据的大小进行判断,如果数据量很小,则攒多条数据成一块,然后再进行块存储;如果数据量大,则直接进行块存储。对于这些数据Receiver直接交给ReceiverSupervisor,由其进行数据转储操作。块存储根据设置是否预写日志分为两种,一种是使用非预写日志BlockManagerBasedBlockHandler方法直接写到Worker的内存或磁盘中;另一种是进行预写日WriteAheadLogBasedBlockHandler方法,即在预写日志同时把数据写入到Worker的内存或磁盘中。数据存储完毕后,ReceiverSupervisor会把数据存储的元信息上报给ReceiverTracker,ReceiverTracker再把这些信息转发给ReceiverBlockTracker,由它负责管理收到数据块的元信息。
  3. 3、在StreamingContext的JobGenerator中维护一个定时器,该定时器在批处理时间到来会进行生成作业的操作,具体如下:
    1.通知ReceiverTracker将接收到的数据进行提交,在提交时采用synchronized关键字进行处理,保证每条数据被划入一个且只被划入一个批中。
    2.要求DStreamGraph根据DStream依赖关系生成作业序列Seq[Job]。
    3.从第一步中ReceiverTracker获取本批次数据的元数据。
    4.把批处理时间time、作业序列Seq[Job]和本批次数据的元数据包装为JobSet,调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler,JobScheduler将把这些作业发送给Spark Core进行处理,由于该操作是异步的,因为本操作执行速度非常快。
    5.只要提交结束(不管作业是否被执行),Spark Streaming对整个系统做一个检查点(Checkpoint)。
    4. 运行原理源码分析
    4.1. 启动流处理引擎
    Spark Streaming运行架构以及代码详解
    4.1.1. 初始化StreamingContext
    首先需要初始化StreamingContext,在初始化的过程中会对DStreamGraph、JobScheduler等进行初始化,DStreamGraph类似于RDD的有向无环图,包含DStream之间相互依赖的有向无环图;JobScheduler的作用是定时查看DStreamGraph,然后根据流入的数据生成运行作业。
    4.1.2. 创建InputDStream
    根据你采用不同的数据源,可能生成的输入数据流不一样。
    4.1.3. 启动JobScheduler
    创建完成InputDStream之后,调用StreamingContext的start方法启动应用程序,其最重要的就是启动JobScheduler。在启动JobScheduler的时候会实例化ReceiverTracker和JobGenerator。
    Spark Streaming运行架构以及代码详解
    4.1.4. 启动JobGenerator
    启动JobGenerator需要判断是否第一次运行,如果不是第一次运行需要进行上次检查点的恢复,如果是第一次运行则调用startFirstTime方法,在该方法中初始化了定时器的开启时间,并启动了DStreamGraph和定时器timer。
    Spark Streaming运行架构以及代码详解
    timer的getStartTime方法会计算出来下一个周期到期时间,计算公式: 当前时间 / 间隔时间。
    4.2. 接收及存储流数据
    4.2.1. 启动ReceiverTracker
    启动ReceiverTracker的时候,如果输入数据流不为空,则调用launchReceivers方法,然后他就会向ReceiverTrackerEndpoint发送StartAllReceivers方法,启动所有Receivers。
    Spark Streaming运行架构以及代码详解
    最后创建ReceiverSupervisor,并启动,在启动的时候,由它启动Receiver。
    4.2.2. Receiver启动并接收数据
    Receiver启动会调用各个具体子类的onstart方法,这里面就会接收数据,以kafka为例,则会根据提供配置创建连接,获取消息流,构造一个线程池,为每一个topic分区分配一个线程处理数据。
    Spark Streaming运行架构以及代码详解
    4.2.3. 启动BlockGenerator生成block
    在ReceiverSupervisorImpl的onstart方法中调用BlockGenerator的start启动BlockGenerator。
    Spark Streaming运行架构以及代码详解
    启动时候会先更新自身状态为Active,然后启动2个线程:
    blockIntervalTimer:定义开始一个新batch,然后准备把之前的batch作为一个block。
    blockPushingThread:把数据块 push到block manager。
    Spark Streaming运行架构以及代码详解
    4.2.4. 数据存储
    Receiver会进行数据的存储,如果数据量很少,则攒多条数据成数据块在进行块存储;如果数据量很大,则直接进行存储,对于需要攒多条数据成数据块的操作在Receiver.store方法里面调用ReceiverSupervisor的pushSingle方法处理。在pushSingle中把数据先保存在内存中,这些内存数据被BlockGenerator的定时器线程blockIntervalTimer加入队列并调用ReceiverSupervisor的pushArrayBuffer方法进行处理。
    他们其实都是调用的是pushAndReportBlock,该方法会调用ReceiveBlockHandler的storeBlock方法保存数据并根据配置进行预写日志;另外存储数据块并向driver报告:
    Spark Streaming运行架构以及代码详解
    4.3. 数据处理
    我们知道DStream在进行action操作时,会触发job。我们以saveAsTextFiles方法为例:
    Spark Streaming运行架构以及代码详解
    foreachRDD:它会向DStreamGraph注册,根据返回的当前的DStream然后创建ForEachDStream
    Spark Streaming运行架构以及代码详解
    register: 向DStreamGraph注册,即向DStreamGraph添加输出流
    Spark Streaming运行架构以及代码详解
    JobGenerator初始化的时候会构造一个timer定时器:
    Spark Streaming运行架构以及代码详解
    它会启动一个后台线程,不断去调用triggerActionForNextInterval方法,该方法就会不断调用processsEvent方法,并且传递GenerateJobs事件
    Spark Streaming运行架构以及代码详解
    JobGenerator# generateJobs

调用DStreamGraph的generateJobs方法产生job,然后利用JobScheduler开始提交job集合
Spark Streaming运行架构以及代码详解
DStreamGraph的generateJobs根据时间产生job集
Spark Streaming运行架构以及代码详解
然后调用DStream的generateJobs产生job
Spark Streaming运行架构以及代码详解
最后提交job集合
提交job集合,遍历每一个job,创建JobHandler,然后JobHandler是一个线程类,在其run方法中会向JobScheduler发送JobStarted事件,从而开始处理job。
Spark Streaming运行架构以及代码详解
Spark Streaming运行架构以及代码详解