spark源码阅读——5. shuffle
shuffle是什么:
分布式计算中,每个节点只计算部分数据,也就是只处理一个分片,那么要想求得某个key对应的全部数据,比如reduceByKey、groupByKey,那就需要把相同key的数据拉取到同一个分区,原分区的数据需要被打乱重组,这个按照一定的规则对数据重新分区的过程就是Shuffle(洗牌)。
Shuffle是连接Map和Reduce之间的桥梁,描述的是数据从Map端到Reduce端的过程,
map阶段的结果会写到shuffle,reduce阶段再拉取。
Hadoop的Shuffle过程
大致分为排序(sort)、溢写(spill)、合并(merge)、拉取拷贝(Copy)、
合并排序(merge sort)这几个过程,大体流程如下:
上图的红绿蓝代表3种key,shuffle过程就是把相同的Key拉取到同一分区,供Reduce节点处理。
Map端:sort spill merge
1.sort(排序)
map端输出的数据,先写到环形缓冲区kvbuffer,溢出则写到磁盘(也可能一直没有达到阀值,也一样要将内存中的数据写入磁盘)。写磁盘之前会排序,按照partition和key两个关键字来排序,排序结果是数据按照partition为单位聚集在一起,同一partition内数据按照key排序。
2.spill(溢写)
当排序完成,便开始把数据刷到磁盘,刷磁盘的过程以分区为单位,一个分区写完,写下一个分区,分区内数据有序,最终生成多个文件
3.merge(合并)
上一步会生成的多个小文件,对于Reduce端拉取数据是相当低效的,那么这时候就有了merge的过程,该过程会将每一个Spill.out文件合并成为一个大文件,合并的过程也是同分片的合并成一个片段(segment),最终所有的segment组装成一个最终文件。
Reduce端:fetch copy和merge sort
1.拉取拷贝(fetch copy)
Reduce任务通过向各个Map任务拉取对应分片。
这个过程都是以Http协议完成,每个Map节点都会启动一个常驻的HTTP server服务,Reduce节点会请求这个Http Server拉取数据,这个过程完全通过网络传输,所以是一个非常重量级的操作。
2.合并排序 (merge sort)
拉取到各个Map节点对应分片的数据之后,会进行再次排序,排序完成,结果丢给Reduce函数进行计算。
总结:
1、shuffle过程就是为了对key进行全局聚合
2、排序操作伴随着整个shuffle过程,所以Hadoop的shuffle是sort-based的
spark的shuffle过程:
Spark shuffle分为write和read两个过程
1.shuffle write
将MapTask产生的数据写到磁盘中,
首先将map的结果文件中的数据记录送到对应的bucket里面(缓冲区),之后,每个bucket里面的数据会不断被写到本地磁盘上,形成一个ShuffleBlockFile,或者简称FileSegment
2.shuffle read
ReduceTask再将数据抓取(fetch)过来
因为不需要排序,fetch一旦开始,就会边fetch边处理(reduce)
spark2.0版本后放弃了Hash Based Shuffle,使用Sort Based Shuffle,
目的是在处理大规模的数据上也不会很容易达到性能瓶颈
其实 Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须要先进行排序 (注意,这里没有说对计算结果进行排序),所以导致它排序的速度有点慢。而 Tungsten-Sort Shuffle 对它的排序算法进行了改进,优化了排序的速度。
https://www.bbsmax.com/A/lk5anoONd1/
需要注意:
Hash Based Shuffle会在磁盘上生成大量中间文件,这些文件将被保留,
这样做是为了在重新计算时不需要重新创建shuffle文件。
这意味着长时间运行的Spark作业可能会占用大量磁盘空间。
Hadoop和Spark的Shuffle区别:
1、Hadoop中有一个Map完成,Reduce便可以去fetch数据了,不必等到所有Map任务完成,而Spark的必须等到父stage完成,也就是父stage的map操作全部完成才能去fetch数据。
这是因为spark必须等到父stage执行完,才能执行子stage,迎合stage规则。
2、Hadoop的Shuffle是sort-base的,那么不管是Map的输出,还是Reduce的输出,都是partion内有序的,而spark不要求这一点。
存在疑问,spark最开始是Hash Based Shuffle,不需要排序。
然后引入Sort Based Shuffle,是要排序的。
Spark 2.0版本Hash Based Shuffle退出历史舞台。
3、Hadoop的Reduce要等到fetch完全部数据,才将数据传入reduce函数进行聚合,而spark是一边fetch一边聚合。