Spark Hash Shuffle笔记

一、Shuffle的含义
Hadoop中,Shuffle产生于Map和Reduce之间。
需要Shuffle的关键原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。
二、Shuffle操作可能面临的问题(运行Task时才会产生Shuffle操作):
数据量可能较大,不同节点间网络传输问题;
数据如何分类,即如何Partition:Hash、Sort、Spark钨丝计划;
负载均衡(数据倾斜问题:在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢);
网络传输效率:需要在压缩与解压缩之间做出权衡,序列化和反序列化也需要被考虑;
具体的Task进行计算时尽可能使得数据具备Process Locality特性,即内存本地性,数据存放于内存。如果不能满足,则增加数据分片,减少每个Task处理的数据量。
默认在执行Shuffle操作前对数据进行持久化,默认存放到本地磁盘。
三、Hash Shuffle
1. Key不能是Array数组;
2. Hash Shuffle不需要排序,节省了Hadoop MapReduce中进行Shuffle需要排序的时间浪费,因为在实际生产环境下有大量的不需要排序的Shuffle类型;
不需要排序的Hash Shuffle不一定比需要排序的Sorted Shuffle速度快。在数据规模较小时,Hash Shuffle会比Sorted Shuffle速度快很多;数据量较大时,Sorted Shuffle一般比Hash Shuffle快很多。
3. 每个ShuffleMapTask会根据key的哈希值计算出当前key需要写入的Partition,然后把决定后的结果写入单独的文件中。此时会导致每个Task产生R(指下一个Stage的全部任务数)个文件,如果当前的Stage中有M个ShuffleMapTask,则会产生M*R个文件。
Shuffle操作绝大多数情况下都要通过网络,如果Mapper和Reducer在同一台机器上,此时只需要读取本地磁盘即可。
Hash Shuffle的两大死穴:1.Shuffle前会产生海量小文件于磁盘上,此时会产生大量耗时低效的IO操作;2. 内存不足。由于内存中需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模较大,内存不可承受,会发生内存溢出。

Spark Hash Shuffle笔记
为了改善上述问题(同时打开过多文件导致Writer Handler内存使用过大以及产生过多文件导致大量的随机读写带来的效率极为低下的磁盘IO操作),Spark推出了Consalidate机制,把小文件合并,此时Shuffle时文件产生的数量为CPU cores个数*Reducer个数,对于ShuffleMapTask的数量明显多于同时可用的并行Cores的数量的情况下,Shuffle产生的文件数量会大幅减少,降低OOM的可能,但在实际工程环境下,并行CPU Cores数量可能较大;
为此Spark推出了Shuffle Pulggable开放框架,方便系统升级时定制Shuffle功能模块,也方便第三方系统改造人员根据实际业务场景开发具体最佳的Shuffle模块:核心接口ShuffleManager,具体默认实现有HashShuffleManager和SortShuffleManager等。
Sorted-Based Shuffle把产生的所有小文件存入一个文件中,并产生一个索引文件,最后进行归并排序。Reducer通过索引文件获得所需数据。