spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

  无论是Hadoop还是spark,shuffle操作都是决定其性能的重要因素。在不能减少shuffle的情况下,使用一个好的shuffle管理器也是优化性能的重要手段。
ShuffleManager的主要功能是在task直接传递数据,所以getWriter和getReader是它的主要接口。
大流程:
  1)需求方:当一个Stage依赖于一个shuffleMap的结果,那它在DAG分解的时候就能识别到这个依赖,并注册到shuffleManager;
  2)供应方:也就是shuffleMap,它在结束后,会将自己的结果注册到shuffleManager,并通知说自己已经结束了。
  3)这样,shuffleManager就将shuffle两段连接了起来。


spark提供了两个shuffle管理器:
  1)HashShuffleManager: 提供了HashShuffleReader和HashShuffleWriter两个方法。数据的写入是按照k-v对的形式写入的,可以自定义排序和聚合。
* A ShuffleManager using hashing, that creates one output file per reduce partition on each
* mapper (possibly reusing these across waves of tasks).
  2)SortShuffleManager: 数据按顺序写入。保存了一个blockId文件和blockId.index文件,用途不太清楚。

 引用一个别人的图来说明这个关系:
spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

shuffle的数据读取的函数为HashShuffleReader,它基本上直接调用了下面的流程:
这是一段写得极为紧密的代码,几乎每一行都带了大量的运算,看得特纠结。。
======================获取block数据的流程============================
->BlockStoreShuffleFetcher.fetch[T](shuffleId: Int,reduceId: Int,...) : Iterator[T] --获取shuffleId,ReduceId对应的数据块
->val blockManager = SparkEnv.get.blockManager  --获取数据块管理器
->val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) 
//--获取shuffleId和reduceId对应的数据‘目录’,statuses是以ManagerId为key的hash表,value是数据的大小
-> val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] --创建解析数据‘目录’的缓冲
->for (((address, size), index) <- statuses.zipWithIndex) --遍历整个,并加了个索引。
->splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) --将‘目录’以地址为索引重新组织
->val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map { --‘目录’再次重新组织
//新的‘目录’格式为Seq[(BlockManagerId, Seq[(BlockId, Long(数据长度))])],因为需要BlockId获取数据
->case (address, splits) => (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
->val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer, shuffleMetrics) --获取多块block数据
-> new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer, readMetrics) --使用netty
->iter = BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) --获取blocks的迭代器
->BasicBlockFetcherIterator 初始化
->protected val localBlocksToFetch = new ArrayBuffer[BlockId]()  --本地获取的blockId
->protected val remoteBlocksToFetch = new HashSet[BlockId]()  --远程获取的blockId
->protected val results = new LinkedBlockingQueue[FetchResult] --获取的结果放在这里
->protected val fetchRequests = new Queue[FetchRequest]  --需要发送出去的请求,主要是为了控制获取的速度
->iter.initialize()  --初始化这个迭代器,它会启动获取数据
->val remoteRequests = splitLocalRemoteBlocks() --将传入的block请求转换按块划分的请求。控制并发度。
->val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)  --允许最多五个节点同时获取数据
// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
->val remoteRequests = new ArrayBuffer[FetchRequest]  --请求块数组
->for ((address, blockInfos) <- blocksByAddress) { 遍历所有请求的blockId,以一个地址为单位
->if (address == blockManagerId) --本地获取
->localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)  本地允许所有同时获取,不控制并发
->else --远端机器
->val iterator = blockInfos.iterator --获得每个地址对应的blockId的迭代器
->while (iterator.hasNext) { --遍历一个地址的所有blockId
->val (blockId, size) = iterator.next()
->curBlocks += ((blockId, size)) --全部写到一个请求块里面
->if (curRequestSize >= targetRequestSize)  --如果一个请求块获取的数据太大了
->remoteRequests += new FetchRequest(address, curBlocks) --那么新建一个请求块
->curBlocks = new ArrayBuffer[(BlockId, Long)]  --创建新的请求块
->remoteRequests += new FetchRequest(address, curBlocks  )  将请求块封装到一个请求消息里面
->return remoteRequests 将所有的请求消息返回
->fetchRequests ++= Utils.randomize(remoteRequests) --// Add the remote requests into our queue in a random order随机打散
->while (!fetchRequests.isEmpty &&(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))
->sendRequest(fetchRequests.dequeue())  --发送请求数据的消息
->val cmId = new ConnectionManagerId(req.address.host, req.address.port)--连接一个地址的信息封装
->blockMessageArray = new BlockMessageArray(req.blocks.map {  遍历这个地址的所有的blockId
->case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))  -- 将请求信息再封装一次!!
->future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) --异步执行
->future.onComplete { --future完成后回调
->case Success(message) => 成功获取数据
->val bufferMessage = message.asInstanceOf[BufferMessage] --获取到了数据块
->val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) --格式转换
->for (blockMessage <- blockMessageArray) --遍历所有获取到的数据块,写到results中
->results.put(new FetchResult(blockId, sizeMap(blockId), () => dataDeserialize(...)))
->logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
->getLocalBlocks()  --获取本地数据
->for (id <- localBlocksToFetch) 
->val iter = getLocalFromDisk(id, serializer).get --从本地磁盘获取
->results.put(new FetchResult(id, 0, () => iter)) --写入results中
->val itr = blockFetcherItr.flatMap(unpackBlock) //处理可能有失败的block的情况
->val completionIter = CompletionIterator[T, Iterator[T]](itr, { context.taskMetrics.updateShuffleReadMetrics()})
->new InterruptibleIterator[T](context, completionIter)  将获取的结果以迭代器的形式返回给上层。

根据shuffleId、reduceId获取结果数据的状态的函数包含了一个缓存功能,稍微复杂,独立拉出来
->MapOutputTracker::getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)]
->val statuses = mapStatuses.get(shuffleId).orNull
->if (statuses == null) --缓存没找到,到远端节点获取,在写入缓存
->if (fetching.contains(shuffleId))
->while (fetching.contains(shuffleId))
->fetching.wait()
->fetching += shuffleId
->val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
->val future = trackerActor.ask(message)(timeout)
->Await.result(future, timeout)
->fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
->mapStatuses.put(shuffleId, fetchedStatuses)
->fetching -= shuffleId
->fetching.notifyAll()
->result = MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
->statuses.map {status =>
->(status.location, decompressSize(status.compressedSizes(reduceId)))
->decompressSize(compressedSize: Byte)  --注意这个数据长度的编码很有意思
->math.pow(LOG_BASE, compressedSize & 0xFF).toLong, --LOG_BASE=1.1
->return result
work接收到消息的处理函数
->BlockManagerWorker::onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] --接收消息
->case bufferMessage: BufferMessage =>  接收到的是bufferMessage
->val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
->processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] --处理blockMessage
->case BlockMessage.TYPE_PUT_BLOCK => --put消息
->val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
->putBlock(pB.id, pB.data, pB.level) --调用写入函数
->blockManager.putBytes(id, bytes, level) --下面比较繁琐,以后再看
->doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
->case BlockMessage.TYPE_GET_BLOCK => { --get消息
->val gB = new GetBlock(blockMessage.getId)
->val buffer = getBlock(gB.id) --读取block
->val buffer = blockManager.getLocalBytes(id) --从本地磁盘读取block
->Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))  --返回给请求者的数据
=================================end====================================
shuffle接口类:
/**
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on both the
* driver and executors, based on the spark.shuffle.manager setting. The driver registers shuffles
*
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
*/
privatetrait /**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
def registerShuffleK, V, CInt,
Int,
K, V, C/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriterK, V, Int, K, V/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
def getReaderK, C,
Int,
Int,
K, C/** Remove a shuffle's metadata from the ShuffleManager. */
def unregisterShuffleInt/** Shut down this ShuffleManager. */
def stopUnit


/**
* :: DeveloperApi ::
* . Note that in the case of shuffle,
* the .
*
* * @param * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class K, V, C@transient K, V,
val ,
val ,
val OrderingK,
val K, V, C,
val Boolean falseextends K, Voverride def rdd K, Vval shuffleIdInt val shuffleHandleenvshuffleId, , thiscleanerthis

/**
* A basic ShuffleHandle implementation that just captures registerShuffle's parameters.
*/
private[spark] class BaseShuffleHandle[K, V, C](
shuffleId: Int,
val numMaps: Int,
val dependency: ShuffleDependency[K, V, C])
extends ShuffleHandle(shuffleId)


/**
* Class that keeps track of the location of the map output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and worker) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
一个shuffleTask的结果会以MapStatus的形式返回给调度器,包括map执行的机器的BlockManager的地址以及输出结果的大小。注意,这个size是经过压缩后的大小
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
extends Externalizable {

private[spark] class BlockMessage() {
// Un-initialized: typ = 0
// GetBlock: typ = 1
// GotBlock: typ = 2
// PutBlock: typ = 3
private var typ: Int = BlockMessage.TYPE_NON_INITIALIZED
private var id: BlockId = null
private var data: ByteBuffer = null
private var level: StorageLevel = null
hash和sort的shuffleManager的reader都是用了这个HashShuffleReader,BlockStoreShuffleFetcher.fetch做了大部分工作。
privateclass K, CK, , C,
Int,
Int,
extends K, Crequire1,
"Hash shuffle currently only supports fetching one partition"private val dep /** Read the combined key-values for this reduce task */
override def readIteratorK, Cval val getSerializerdepval , , , ,
val IteratorK, Cif depif depnew , dep, else new , dep, else if depdepthrow new "Aggregator is empty for map-side combine"else // Convert the Product2s to pairs since this is what downstream RDDs currently expect
IteratorK, C, // Sort the output if there is a sort ordering defined.
depmatch case SomeOrderingK// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val new K, C, CSome, SomememoryBytesSpilled diskBytesSpilled case /** Close this reader */
override def stopUnit ???
HashShuffleWriter: 这个只是对父类的writer做了每次写入一个k-v的封装,比较简单
privateclass K, VK, V, ,
Int,
extends K, Vwith private val blockManager getprivate val shuffleBlockManager blockManagershuffleBlockManager
private val ser getSerializerdepnullprivate val shuffle shuffleBlockManagerdepshuffleId, , numOutputSplits, ser,
writeMetrics/** Write a bunch of records to this task's output */
override def writeIteratorK, VUnit val if depif depdep, else else if depdepthrow new "Aggregator is empty for map-side combine"else for val dep

SortShuffleWriter会把数据按顺序写入,并且保持存blockId文件和blockId.index文件。
privateclass K, V, CK, V, C,
Int,
extends K, Vwith private val dep private val numPartitions depprivate val blockManager getprivate val ser getSerializerdepprivate val conf getprivate val fileBufferSize conf"spark.shuffle.file.buffer.kb", 321024

private var sorterK, V, null
private var outputFilenull
private var indexFilenull

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
// we don't try deleting files, etc twice.
private var stopping false

private var mapStatusnull

private val writeMetrics new shuffleWriteMetrics SomewriteMetrics/** Write a bunch of records to this task's output */
override def writeIteratorK, VUnit if depif depthrow new "Aggregator is empty for map-side combine"sorter new K, V, Cdep, Somedep, dep, depsorterelse // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter new K, V, V, Somedep, , depsorter val ShuffleBlockIddepshuffleId, , 0outputFile blockManagerdiskBlockManagerindexFile blockManagerdiskBlockManager".index"val sorter, // Register our map output with the ShuffleBlockManager, which handles cleaning it over time
blockManagershuffleBlockManagerdepshuffleId, , numPartitionsmapStatus new blockManagerblockManagerId,
compressSize

  有关shuffle的细节,甚至是原理,都理解的不够深入,还有很多的需要学习。


















posted on 2015-01-26 00:33 过雁 阅读(...) 评论(...) 编辑 收藏