SparkStreaming数据流从currentBuffer到Block定时转化过程源码深度剖析-Spark商业环境实战
- Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
- Spark商业环境实战-Spark事件监听总线流程分析
- Spark商业环境实战-Spark存储体系底层架构剖析
- Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
- Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
- Spark商业环境实战-Spark任务延迟调度及调度池Pool架构剖析
- Spark商业环境实战-Task粒度的缓存聚合排序结构AppendOnlyMap详细剖析
- Spark商业环境实战-ExternalSorter 外部排序器在Spark Shuffle过程中设计思路剖析
- Spark商业环境实战-ShuffleExternalSorter外部排序器在Spark Shuffle过程中的设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器UnsafeShuffleWriter设计思路剖析
- Spark商业环境实战-Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析
- Spark商业环境实战-Spark Shuffle 核心组件BlockStoreShuffleReader内核原理深入剖析
- Spark商业环境实战-Spark Shuffle 管理器SortShuffleManager内核原理深入剖析
- Spark商业环境实战-StreamingContext启动流程及Dtream 模板源码剖析
- Spark商业环境实战-ReceiverTracker 启动过程及接收器 receiver RDD 任务提交机制源码剖析
- Spark商业环境实战-SparkStreaming数据流从currentBuffer到Block定时转化过程源码深度剖析
- Spark商业环境实战-SparkStreaming之JobGenerator周期性任务数据处理逻辑源码深度剖析
- [Spark商业环境实战-SparkStreaming Graph 处理链迭代过程源码深度剖析]
1 ReceiverTracker 以一发牵动全身
-
下图深度剖析了ReceiverTracker中如何实现 receiver RDD 的Job提交流程,右侧黄色底面为本节重点要讲的ReceiverSupervisorImpl。实现了receiver的启动,以及Block的生成等过程。
-
下图可以清晰的看到supervisor作为父类,在StartReceiverFunc,启动了两个start函数:
/** Start the supervisor */ def start() { onStart() startReceiver() }
(1)第一个启动了 ReceiverSupervisorImpl的 onStart()方法,从而启动了registeredBlockGenerators,开启了数据batch的生成和管理。
override protected def onStart() { registeredBlockGenerators.asScala.foreach { _.start() } }
(2)第二个 startReceiver,先调用startReceiver,进一步会调用ReceiverSupervisorImpl的onReceiverStart方法来判断是否成功注册到ReceiverTracker中,若成功则会启动receiver
supervisor的startReceiver方法 def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { <=神来之笔(端点通讯注册Receiver) logInfo(s"Starting receiver $streamId") receiverState = Started receiver.onStart() <=神来之笔 logInfo(s"Called receiver $streamId onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) } } catch { case NonFatal(t) => stop("Error starting receiver " + streamId, Some(t)) } } ReceiverSupervisorImpl的onReceiverStart方法 override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) trackerEndpoint.askSync[Boolean](msg) }
(3)ReceiverTracker的receiver注册请求管理
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Remote messages case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) => val successful = registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) <=神来之笔 (eceiverTrackingInfos的管理) context.reply(successful) case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) } case DeregisterReceiver(streamId, message, error) => deregisterReceiver(streamId, message, error) context.reply(true) // Local messages case AllReceiverIds => context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case GetAllReceiverInfo => context.reply(receiverTrackingInfos.toMap) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() context.reply(true) }
(4)registerReceiver中如何实现receiverTrackingInfos的管理
/** Register a receiver */
private def registerReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
if (isTrackerStopping || isTrackerStopped) {
return false
}
val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
val acceptableExecutors = if (scheduledLocations.nonEmpty) {
// This receiver is registering and it's scheduled by
// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
scheduledLocations.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}
def isAcceptable: Boolean = acceptableExecutors.exists {
case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
case loc: TaskLocation => loc.host == host
}
if (!isAcceptable) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
val name = s"${typ}-${streamId}"
val receiverTrackingInfo = ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
scheduledLocations = None,
runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
name = Some(name),
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo) <=神来之笔
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
true
}
}
- ReceiverSupervisorImpl总体架构图如下:
2 BlockGenerator的深度剖析
2.1 SocketInputDStream 数据流的接收存储过程
-
依赖于ReceiverSupervisor
/** Create a socket connection and receive data until receiver is stopped */ def receive() { try { val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next()) <= 神来之笔 } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { onStop() } }
-
依赖于ReceiverSupervisor的pushSingle方法
* Store a single item of received data to Spark's memory. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. def store(dataItem: T) { supervisor.pushSingle(dataItem) <= 神来之笔 }
-
依赖于ReceiverSupervisor的内部的defaultBlockGenerator
/* Push a single record of received data into block generator. def pushSingle(data: Any) { defaultBlockGenerator.addData(data) <= 神来之笔 }
2.2 BlockGenerator的重剑无锋
- 兄弟1:blockIntervalTimer
- 兄弟2:blockPushingThread
BlockGenerator的仗剑走天涯,诗酒趁年华。两大线程解决block存储和管理问题:
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
一大线程:
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
二大线程:
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
俩兄弟上吧
def start(): Unit = synchronized { if (state == Initialized) { state = Active blockIntervalTimer.start() <= 神来之笔 blockPushingThread.start() <= 神来之笔 logInfo("Started BlockGenerator") } else { throw new SparkException( s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]") } }
2.3 BlockGenerator的厚积薄发
-
积水缓冲缸(存储离散的数据流)(currentBuffer)
@volatile private var currentBuffer = new ArrayBuffer[Any]
-
桶装水(积水缓冲缸的水聚集成桶)(blocksForPushing)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
-
积水缓冲缸通过InputDStream蓄水
* Push a single data item into the buffer. def addData(data: Any): Unit = { if (state == Active) { waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }
-
blockIntervalTimer把积水缓冲缸转换为桶装水,管理起来
/** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) <= 神来之笔 listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) <= 神来之笔 } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full <= 神来之笔 } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
-
keepPushingBlocks 看我搅动风云
2.3 BlockGenerator内部keepPushingBlocks的搅动风云
- blockPushingThread线程调用defaultBlockGeneratorListener,通过blockManager.putBytes来存储Block,同时告诉Driver端ReceiverTracker的 AddBlock(blockInfo)信息添加成功。
- ReceiverTracker通过上面的端点通信掌握了Block的存储元数据信息。
- 根据是否开启预读写日志,出现WriteAheadLogBasedBlockHandler和BlockManagerBasedBlockHandler,参数可以通过conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)来配置。
- 一切的一切都是ReceiverSupervisorImpl来主导的,因为ReceiverSupervisorImpl内部成员包含了registeredBlockGenerators,defaultBlockGeneratorListener,trackerEndpoint等等所有给力干将。所以从receiver启动到block生成,到block的管理,再到上报给ReceiverTracker,真正实现了覆盖一切的角色
3 总结
我们发现看了一场ReceiverSupervisorImpl的世纪大戏,自导自演解决了端到端的问题。
秦凯新 于深圳 2018