菜鸟的Spark 源码学习之路 -7 Storage管理源码 -part1 BlockManagerMaster
上一次我们了解内存管理中MemoryPool的源码实现,这一次我们从下往上,学习spark的存储管理。Storage包结构如下:
本次我们从Block的管理入手开始学习存储管理。从上面的包中我们可以看出,Block管理主要包含BlockManagerMaster和BlockManager两个部分。BlockManager和MemoryManager一样是SparkEnv中的一个组件,如下图所示:
1. BlockManagerMaster
SparkEnv 在初始化BlockManager前,先创建了一个BlockManagerMaster:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // NB: blockManager is not valid until initialize() is called later. // 在调用initialize方法前,BlockManager不可用 val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
此处调用了如下方法:
def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } }
在是节点Driver 的情况下,创建RpcEndpointRef,否则获取RpcEndpointRef的引用。
BlockManagerMaster主要包括一些Executor,Block,BlockManager的管理方法:
1.1 注册BlockManager
/**
* Register the BlockManager's id with the driver. The input BlockManagerId does not contain
* topology information. This information is obtained from the master and we respond with an
* updated BlockManagerId fleshed out with this information.
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
// 在本地的BlockManagerMaster中完成注册
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
这里BlockManager发起注册请求,向BlockManagerMaster进行注册。远端处理请求在BlockManagerMasterEndpoint.receiveAndReply方法中,对各种请求进行了匹配
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) case GetLocations(blockId) => context.reply(getLocations(blockId)) case GetLocationsAndStatus(blockId) => context.reply(getLocationsAndStatus(blockId)) case GetLocationsMultipleBlockIds(blockIds) => context.reply(getLocationsMultipleBlockIds(blockIds)) case GetPeers(blockManagerId) => context.reply(getPeers(blockManagerId)) case GetExecutorEndpointRef(executorId) => context.reply(getExecutorEndpointRef(executorId)) case GetMemoryStatus => context.reply(memoryStatus) case GetStorageStatus => context.reply(storageStatus) case GetBlockStatus(blockId, askSlaves) => context.reply(blockStatus(blockId, askSlaves)) case GetMatchingBlockIds(filter, askSlaves) => context.reply(getMatchingBlockIds(filter, askSlaves)) case RemoveRdd(rddId) => context.reply(removeRdd(rddId)) case RemoveShuffle(shuffleId) => context.reply(removeShuffle(shuffleId)) case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver)) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) context.reply(true) case RemoveExecutor(execId) => removeExecutor(execId) context.reply(true) case StopBlockManagerMaster => context.reply(true) stop() case BlockManagerHeartbeat(blockManagerId) => context.reply(heartbeatReceived(blockManagerId)) case HasCachedBlocks(executorId) => blockManagerIdByExecutor.get(executorId) match { case Some(bm) => if (blockManagerInfo.contains(bm)) { val bmInfo = blockManagerInfo(bm) context.reply(bmInfo.cachedBlocks.nonEmpty) } else { context.reply(false) } case None => context.reply(false) } }
其他的操作流程大致类似,都是通过远端RPC请求进行操作。RPC消息均是一系列的case class:
2. BlockManager
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
* 运行在每个node的上,提供本地和远程存放和获取不同存储媒介上blocks的接口。必须调用initialize方法保证其可用
* Note that [[initialize()]] must be called before the BlockManager is usable.
*/
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
2.1 初始化
接下来我们就看一下initialize方法:
/** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, * where it is only learned after registration with the TaskScheduler). * 初始化给定Appid对应的BlockManager。 不在构造方法中初始化的原因在于AppId在BlockManager实例化时可能无法获取,尤其是在driver端,只有在TaskScheduler中注册后才会有Appid。 * This method initializes the BlockTransferService and ShuffleClient, registers with the * BlockManagerMaster, starts the BlockManagerWorker endpoint, and registers with a local shuffle * service if configured. 本方法主要初始化BlockTransferService 和ShuffleClient,向BlockManagerMaster发起注册请求,启动BlockManagerWorker 终端,如果配置了本地shuffle服务,则同时注册该服务。 */ def initialize(appId: String): Unit = { blockTransferService.init(this) shuffleClient.init(appId) blockReplicationPolicy = { val priorityClass = conf.get( "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) val clazz = Utils.classForName(priorityClass) val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] logInfo(s"Using $priorityClass for block replication policy") ret } val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) val idFromMaster = master.registerBlockManager( id, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } logInfo(s"Initialized BlockManager: $blockManagerId") }
2.2 获取Block数据
接下来我们看一下获取本地Block数据的接口
/** * Interface to get local block data. Throws an exception if the block cannot be found or * cannot be read successfully. */ override def getBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) } else { getLocalBytes(blockId) match { case Some(blockData) => new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true) case None => // If this block manager receives a request for a block that it doesn't have then it's // likely that the master has outdated block statuses for this block. Therefore, we send // an RPC so that this block is marked as being unavailable from this block manager. reportBlockStatus(blockId, BlockStatus.empty) throw new BlockNotFoundException(blockId.toString) } } }
这里调用了getLocalBytes获取序列化后的block数据
/** * Get block from the local block manager as serialized bytes. */ def getLocalBytes(blockId: BlockId): Option[BlockData] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work // 这里做了优化,如果block是用于shuffle,则不对其加锁。 因为磁盘存储不会删处这些数据 if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. // 尝试获取block数据 val buf = new ChunkedByteBuffer( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) Some(new ByteBufferBlockData(buf, true)) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } //加读锁,保证在读取过程中数据没有改变 } }
上述方法在读取的时对数据加了读锁,接下来我们看一下锁的释放:
这里有一个releaseLock方法
/** * Release a lock on the given block with explicit TID. * The param `taskAttemptId` should be passed in case we can't get the correct TID from * TaskContext, for example, the input iterator of a cached RDD iterates to the end in a child * thread. */ def releaseLock(blockId: BlockId, taskAttemptId: Option[Long] = None): Unit = { blockInfoManager.unlock(blockId, taskAttemptId) }
调用了 blockInfoManager.unlock方法
先看一下blockInfoManager,它是BlockManager 中用于元数据跟踪和block锁管理的组件。它的锁接口主要是读写锁。每个锁的获取都与一个运行中的task相关联,锁在task完成或失败时会自动释放。
/** * Component of the [[BlockManager]] which tracks metadata for blocks and manages block locking. * * The locking interface exposed by this class is readers-writer lock. Every lock acquisition is * automatically associated with a running task and locks are automatically released upon task * completion or failure. * * This class is thread-safe. */ private[storage] class BlockInfoManager extends Logging
再看一下具体的释放锁过程:
/** * Release a lock on the given block. * In case a TaskContext is not propagated properly to all child threads for the task, we fail to * get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock. * * See SPARK-18406 for more discussion of this issue. */ def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER //在确定没有写任务后,解除blockId和taskId的绑定关系 writeLocksByTask.removeBinding(taskId, blockId) } else { // 否则修改blockId的引用计数 assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() }
以上就是获取数据的过程。