Spark Streaming 原理剖析

通过源码呈现 Spark Streaming 的底层机制。

  1. 初始化与接收数据
Spark Streaming 通过分布在各个节点上的接收器缓存接收到的流数据并将流数 据 包 装 成 Spark 能 够 处 理 的 RDD的格式 输入到Spark Streaming 之 后由Spark  Streaming将作业提交到Spark集群进行执行如图1所示。

Spark Streaming 原理剖析

 

            图 1  Spark Streaming 执行模型

  初始化的过程主要可以概括为两点
1调度器的初始化。
调度器调度 Spark Streaming 的运行用户可以通过配置相关参数进行调优。
2将输入流的接收器转化为 RDD 在集群进行分布式分配然后启动接收器集合中的每个接收器。
针对不同的数据源 Spark Streaming 提供了不同的数据接收器分布在各个节点上的每个接收器可以认为是一个特定的进程接收一部分流数据作为输入。
用户也可以针对自身生产环境状况自定义开发相应的数据接收器。
如图 2 所示接收器分布在各个节点上。通过下面代码创建并行的、在不同Worker 节点分布的 receiver 集合。

Spark Streaming 原理剖析
 val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r,
Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
// 在这里创造 RDD 相当于进入 SparkContext.makeRDD
// 此处将 receivers 的集合作为一个 RDD 进行分区 RDD[Receiver]
// 即使是只有一个输入流按照这个分布式也是流的输入端在 worker 而不再 Master
// 将 receivers 的集合打散然后启动它们
…
ssc.sparkContext.runJob(tempRDD, startReceiver)
…
}
Spark Streaming 原理剖析

 

Spark Streaming 原理剖析

                            图  2 Spark Streaming 接收器

  

 

2. 数据接收与转化
在上面的“初始化与接收数据”部分中已经介绍过 receiver 集合转换为 RDD在集群上分布式地接收数据流。那么每个 receiver 是怎样接收并处理数据流的呢读者可以通过图 3对输入流的处理有一个全面的了解。图 3为 Spark Streaming 数据接收与转化的示意图。
图 3 的主要流程如下。
1数据缓冲在 receiver 的 receive 函数中接收流数据将接收到的数据源源不断地放入到 BlockGenerator.currentBuffer。
2缓冲数据转化为数据块在 BlockGenerator 中有一个定时器RecurringTimer将 当 前 缓 冲 区 中 的 数 据 以 用 户 定 义 的 时 间 间 隔 封 装 为 一 个 数 据 块 Block 放 入 到
BlockGenerator 的 blocksForPush 队列中这个队列。
3数据块转化为 Spark 数据块在 BlockGenerator 中有一个 BlockPushingThread线程不断地将 blocksForPush 队列中的块传递给 BlockManager让 BlockManager 将
数据存储为块。 BlockManager 负责 Spark 中的块管理。
4元数据存储在 pushArrayBuffer 方法中还会将已经由 BlockManager 存储的元数据信息例如 Block 的 id 号传递给 ReceiverTracker ReceiverTracker 会将存储的
blockId 放到对应 StreamId 的队列中。

Spark Streaming 原理剖析

                  图 3 Spark Streaming 数据接收与转化

  图中部分组件的作用如下
  KeepPushingBlocks调用此方法持续写入和保持数据块。
  pushArrayBuffer调用 pushArrayBuffer 方法将数据块存储到 BlockManager 中。
  reportPushedBlock存储完成后汇报数据块信息到主节点。
  receivedBlockInfo Meta Data已经接收到的数据块元数据记录。
  streamId数据流 Id。
  BlockInfo数据块元数据信息。
  BlockManager.put数据块存储器写入备份数据块到其他节点。
  Receiver 数据块接收器接收数据块。
  BlockGenerator数据块生成器将数据缓存生成 Spark 能处理的数据块。
  BlockGenerator.currentBuffer 缓存网络接收的数据记录等待之后转换为 Spark的数据块。
  BlockGenerator.blocksForPushing 将一块连续数据记录暂存为数据块待后续转换为 Spark 能够处理的 BlockManager 中的数据块A Block As a BlockManager’s Block)。
  BlockGenerator.blockPushingThread守护线程负责将数据块转换为 BlockManager中数据块。
  ReceiveTracker输入数据块的元数据管理器负责管理和记录数据块。
  BlockManager Spark 数据块管理器负责数据块在内存或磁盘的管理。
   RecurringTimer时间触发器每隔一定时间进行缓存数据的转换。

  上面的过程中涉及最多的类就是 BlockGenerator在数据转化的过程中其扮演者不可或缺的角色。

private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
) extends Logging

 



 

 

3. 生成 RDD 与提交 Spark Job
Spark Streaming 根据时间段将数据切分为 RDD然后触发 RDD 的 Action 提交 Job Job 被 提 交 到 Job Manager 中 的 Job Queue 中 由 Job Scheduler 调 度 之 后Job Scheduler 将 Job 提交到 Spark 的 Job 调度器然后将 Job 转换为大量的任务分发给 Spark 集群执行如图 4 所示。

Spark Streaming 原理剖析

                图 4    Spark Streaming 调度模型

   Job generator 中通过下面的方法生成 Job 进行调度和执行。
从下面的代码可以看出 job 是从 outputStream 中生成的然后再触发反向回溯执行
整个 DStream DAG类似 RDD 的机制。

Spark Streaming 原理剖析
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
// 获取输入数据块的元数据信息
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
. . .
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor !DoCheckpoint(time)
}
// 下 面 进 入 JobScheduler 的 submitJobSet 方 法 一 探 究 竟 JobScheduler 是 整 个 Spark
Streaming 调度的核心组件
def submitJobSet(jobSet: JobSet) {
. . .
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
. . .
}
// 进入 Graph 生成 job 的方法 Graph 本质是 DStreamGraph 类生成的对象
final private[streaming] class DStreamGraph extends Serializable with

Logging {
def generateJobs(time: Time): Seq[Job] = {
. . .
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
. . .
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
. . .
}
// outputStreams 中的对象是 DStream下面进入 DStream 的 generateJob 一探究竟
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
// 此处相当于针对每个时间段生成的一个 RDD会调用 SparkContext 的方法 runJob 提交 Spark 的一
个 Job
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
// 在 DStream 算是父类一些具体的 DStream 例如 SocketInputStream 等的类的父类可以通过
SocketInputDStream 看是如何通过上面的 getOrCompute 生成 RDD 的
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
generatedRDDs.get(time) match {
. . .
case None => {
if (isTimeValid(time)) {
// Dstream 是个父类这里代表的是子类的 compute 方法 DStream 通过 compute 调用用户自定
义函数。当任务执行时同一个 stage 中的 DStream 函数会串联依次执行
compute(time) match {
. . .
generatedRDDs.put(time, newRDD)
. . .
}
在 SocketInputDStream 的 compute 方法中生成了对应时间片的 RDD
override def compute(validTime: Time): Option[RDD[T]] = {
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)

receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
Spark Streaming 原理剖析

  Spark Streaming 在保证实时处理的要求下还能够保证高吞吐与容错性。用户的数据分析中很多情况下也存在需要分析图数据运行图算法通过 GraphX 可以简便地开发分布式图分析算法。

 


本文转自大数据躺过的坑博客园博客原文链接http://www.cnblogs.com/zlslch/p/5725374.html如需转载请自行联系原作者