Spark join的分类
当前SparkSQL支持三种Join算法:shuffle hash join
、broadcast hash join
以及sort merge join
。
Hash Join
采用hash join算法,整个过程会经历三步:
-
确定Build Table以及Probe Table:Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table。
-
构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。
-
探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起。
Broadcast Hash Join
将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景。broadcast hash join可以分为两步:
- broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路
- hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探
SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold
,默认为10M。
Shuffle Hash Join
在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。默认设置的shuffle partition的个数是200,然后两张表的key值分别去基于200做hash取余然后散步在每个区域中了,这样的思想先把相近的合并在一个区内,再在每个分区内去做比较key值的等值比较,就避免了大范围的遍历比较,节省了时间和内存。如下图所示,shuffle hash join也可以分为两步:
- shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点
- hash join阶段:每个分区节点上的数据单独执行hash join算法
Sort-Merge Join
SparkSQL对两张大表join采用sort-merge join。在Shuffle Hash Join方法的基础上,hash取余之后要分别对两张表的key值进行排序,这样去做等值比较的时候就不需要将某一方的全部数据都加载到内存进行计算了,而是即用即取即丢,从而大大提升了大数据量下sql join的稳定性。因为两个序列都是有序的,从头遍历,碰到key相同的就输出;如果不同,左边小就继续取左边,反之取右边。如下图所示,整个过程分为三个步骤:
- shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
- sort阶段:对单个分区节点的两表数据,分别进行排序
- merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边,见下图示意:
sort-merge join的代价并不比shuffle hash join小,那为什么SparkSQL还会在两张大表的场景下选择使用sort-merge join算法呢?
这和Spark的shuffle实现有关,目前Spark的shuffle实现都适用sort-based shuffle算法,因此在经过shuffle之后partition数据都是按照key排序的。因此理论上可以认为数据经过shuffle之后是不需要sort的,可以直接merge。