Shuffle的两种写操作

一、简介

        在Hadoop的MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要被使用到Reduce中必须经过Shuffle这个环节。由于Shuffle阶段涉及磁盘的读写和网络传输,因此Shuffle的性能高低直接影响到整个程序的性能和吞吐量。在MapReduce过程中,需要各个节点上的同一类数据汇集到某个节点进行计算,把这些分布在不同节点的数据按照一定的规则聚集到一起的过程称为Shuffle。

二、Shuffle的写操作

1、基于哈希的Shuffle写操作

        在Hadoop中Reduce所处理的数据都是经过排序的,但实际处理中,很多场景并不需要排序,因此在Spark1.0中实现的是基于Hash的Shuffle写操作机制。如下图:

Shuffle的两种写操作

        在该机制中每一个Mapper会根据Reduce的数量创建出相应的bucket(每一个bucket其实就是一个文件),bucket的数量是M*R,其中M表示Map Task的数量,R表示Reduce Task的数量。

        Spark1.0假定大多数情况下,Shuffle的数据排序是不必要的,比如WordCount,强制进行排序只能使性能变差,因此Spark1.0并不在Reduce端做merge sort,而是使用聚合。聚合实际上是一个HashMap,它以当前任务输出结果的key作为键值,以任意要combine类型作为value值,当在Word Count的Reduce进行单词计数时,它会将Shuffle读到的每一个键值对更新,或者插入到Hash Map中。这样就不需要预先把所有的键值对进行merge sort,而是来一个处理一个。

2、基于排序的Shuffle写操作

        虽然基于Hash的Shuffle写操作能够较好的完成Shuffle数据的写入,但明显存在两大问题:

①每个MapTask为每个Reduce Task都创建一个单独的文件,加入M=1000,R=1000,那么将会生成1M个文件,在Shuffle数据量不大,文件很多的情况下,会严重降低I/O的性能;

②基于Hash的Shuffle过程,会使用到一个DiskBlockObjectWriter对象,每一个文件都会生成一个Writer Handler对象,默认情况下需要100KB的内存,那么1M的文件,就需要100GB内存,尽管Shuffle实际写操作是分时运行,但缓存开销也是非常大。

        Spark1.1版本中借鉴了Hadoop在Shuffle的处理方式,引入基于排序的Shuffle写操作机制。在该机制中,每个Map Task不会为后续的每个任务创建单独的文件,而是会将所有的结果写到同一个文件中,对应的生成一个Index文件进行索引。通过这种机制,避免了大量文件的产生,一方面可以减轻文件系统管理的压力,另一方面可以减少Writer Handler对内存的开销,节省了GC的风险和频率。基于排序的Shuffle写操作架构图如下:

Shuffle的两种写操作

        基于排序的Shuffle过程中,先判断Map Task输出结果在Map端是否需要进行combine操作,如果需要合并,则外部排序中进行聚合并排序;如果不需要,则外部排序中不进行聚合和排序。确认外部排序的方式后,在外部排序中,将使用PartitionAppendOnlyMap来存放数据,当排序中的Map数据占用内存超过阈值时,则将Map中的数据溢写到磁盘中,每一次溢写都会生成一个Segment文件。当所有数据处理完毕后,在外部排序中有可能一部分计算结果仍然存在于内存中,而另一部分计算结果溢写到一个或多个Segment文件中,这时会通过merge操作将内存和spill文件中的内容合并到一个文件里。