漫谈分布式系统(21) -- 基于代价的优化
这是《漫谈分布式系统》系列的第 21 篇,预计会写 30 篇左右。每篇文末有为懒人准备的 TL;DR,还有给勤奋者的关联阅读。扫描文末二维码,关注公众号,听我娓娓道来。也欢迎转发朋友圈分享给更多人。
贴近性能优化的本质
上一篇文章,简单介绍了基于规则的优化 RBO,最后提到 RBO 由于是从经验归纳而来的,所以无法解决很多问题。
根本原因在于经验归纳这个方法是有缺陷的,很难面面俱到。
比如查询走索引的例子,如果 value 分布非常不均衡,那走索引反倒不如全表扫描。
再比如多表 join 的情况,哪个表放前面更好,显然是无法总结出固定规则的。
总之,抽象过的 RBO 太间接以至于脱离实际情况,所以我们需要一个更加直接和贴近细节的优化方法。
先思考几个更加基本的问题。
性能优化的目标是什么?
更短的执行时间,
更低的延迟,
更大的吞吐量,
更少的资源使用,
......
性能优化的重点是什么?
在分布式程序里面,我们重点关注了 Shuffle,因为涉及非常多 IO,IO 是最拖累性能的,
在 SQL Join 里面,我们除了关注 Shuffle,还考虑了 Match,因为涉及很多 IO 和计算量,
在 Spark ML 里,我们通过 cache 数据来避免重复迭代计算,因为计算量实在太大,
......
好像越想越复杂了,但这还没完,还有句话叫做:如果你无法度量,就无法优化。
那性能又该怎么度量?
在常规算法里,我们通过时间复杂度和空间复杂度来衡量算法好坏,
那在分布式计算里呢,在 SQL 里呢?
时间复杂度和空间复杂度?似乎有些启发。
时间复杂度,可以对应计算量;空间复杂度,不就是内存占用吗?
这样看来,我们可以类似地,从资源占用的角度来设计分布式程序的性能度量方法。
于是问题变成,一个分布式程序会使用哪些系统资源?
先粗略地看下,对一个程序而言,会使用到一台计算机的主要资源有这些:
CPU
Memory
IO
Disk IO
Network IO
扩展到分布式程序下的多个节点,考虑到这篇文章的主题,以及我们最近关注的 Hive 和 Spark 都大量处理 HDFS 上的数据,而从 HDFS 读取数据,既有可能从本地磁盘读取,又有可能从网络拉取,并且无法事先确定,所以需要引入 HDFS IO。
而另一方面,对内存的使用是很难事先计算好的。这很好理解,比如我们前面讲过的 Shuffle 和 Join,对不同的数据量和参数设置,可能采取不同的算法,过程中还有 spill 等操作,再加上实际数据对压缩率的影响等等,都导致对内存使用的估算,是很难准确的。
基于这些考虑,我们对包括 SQL 在内的分布式程序的资源消耗,主要考虑这几方面:
CPU
IO
Disk IO
Network IO
HDFS IO
我们做性能优化的目的,就是降低这几方面的消耗;我们做性能优化的效果,也通过这几个属性来衡量。
定义代价模型
上面列了几个衡量性能的属性,但还需要量化成指标,才好计算。
以 Hive 为例,不难定义出一些参数,Spark SQL 也大同小异。
单位消耗:
Hr - 从 HDFS 读 1 byte 的消耗,单位纳秒,
Hw - 往 HDFS 写 1 byte 的消耗,单位纳秒,
Lr - 从本地磁盘读 1 byte 的消耗,单位纳秒,
Lw - 往本地磁盘写 1 byte 的消耗,单位纳秒,
NEt – 网络传输 1byte 的消耗,单位纳秒,
CPUc – 比较操作的 CPU 消耗,单位纳秒,
统计数据:
T(R) - 行数,
Tsz – 每行的平均大小,
V(R, a) – 表 R 里 a 列的 distinct count 数。
指标有了,还得有公式,算出一个可以直接比较的结果,这样才可用。
上篇文章我们提过,SQL 解析和 RBO 优化都是以 AST 为基础的。
如上图这个例子,既然 RBO 都能以 AST 为基础做优化,那我们是不是也能以 AST 为基础来设计我们的资源消耗公式。
可以看到,这个 AST 是由一系列的操作组成的,如果我们能算出每一个操作的资源消耗,那理论上,全加起来,就是这颗 AST 的总消耗了,也就是这条 SQL 的消耗了。
于是问题变成,怎么计算每种操作的消耗?
也不麻烦,毕竟 SQL 作为声明式语言,支持的操作是有限的,全部都定义一遍就好了。
对于代价,除了 CPU 和 IO,我们还考虑下行数,原因待会再讲。
先看最简单的,Table Scan 的消耗。扫描一张表,不涉及 CPU,都是 HDFS IO 操作,行数为表的行数。
CPU Cost = 0,
IO Cost = Hr * T(R) * Tsz,
Number of Rows = T(R)。
再来个稍微复杂点的,两张表做 Map Join 的消耗,先把小表分发到各个 mapper,再构建 HashTable,然后做 Hash Join。
CPU Cost = (T(R2)+(T(R1)+T(R2)))* CPUc,
IO Cost = T(R2) * Tsz * NEt * number of mappers,
Number of Rows = Join Cardinality。
最后看一个 Filter 的例子,过滤数据是纯 CPU 开销,因为 IO 开销已经在 Filter 的上一个环节计算过了。
CPU Cost = T(R) * CPUc ,
IO Cost = 0,
Number of Rows = Filter Selectivity * Number of Rows from Child。
类似上面三个例子这样,所有的操作都可以计算出开销,再把 AST 里每个操作的开销加起来就能得到总的开销。
这个总的开销,我们定义为 cost,这种计算每个计划的 cost,再选择 cost 最低的优化方式,就是所谓的基于代价的优化(Cost Based Optimization),简称 CBO。
但是,上面我们列了几种不同的开销,不同属性要怎么加呢?还有为什么要单独把行数拎出来看呢?
我们从源码里找答案。
先看 Hive 怎么比较两个执行计划的代价:
可以看到,Hive 认为 CPU 和 IO 更重要,所以要优先比较,但是 CPU 和 IO 间就不再深究,直接相加,如果 CPU+IO 相等,再比较行数。
上一篇我们提过,Hive 的 Optimizor 是用的 Apache Calcite 的,而 Calcite 是这样比较两个计划的代价的:
居然只用考虑了行数。
所以,我们可以大致理解,Hive 采用这样的方式比较代价,一方面是贴近代价的本质的,另一方面,也保留了最基础和传统的以行数为核心的代价作为补充。
用行数来兜底,是可以理解的,毕竟还有内存等我们没有考虑或者无法考虑的因素,没有被算在代价之内,而行数的抽象级别介于规则和资源消耗之间,相对合适。
但把 CPU 和 IO 直接相加,就有点不严谨了。在一个资源竞争的环境中,CPU 和 IO 的成本和稀缺性是不一样。
所以 Spark SQL 作为后起之秀,代价公式又比 Hive 的更好点。
稍早些的时候,Spark SQL 对 CPU 和 IO 用权重参数来区分重要性,代价公式按照 weight * CPU Cost + (1-weight) * IO Cost
的思路设计。
实际上为了简化,使用的是 weight * cardinality + (1-weight) * size
,用 cardinality 和 size 来分别代替 CPU 和 IO。
后来又进化成下面这样:
除了绝对的权重参数外,对 cardinality 和 size 分别计算相对值,这样相对绝对者相加的方法,也能更好地避免相互影响。
可以看到,代价公式是在不断进化的,希望尽量能在合理性和成本上取得平衡。
采集统计数据
前面说过,计算代价的时候,除了固定的单位消耗外,还有一些统计数据。这很好理解,单价*数量,才等于总价嘛。
而这些统计数据,很显然,由于直接和具体数据有关,是需要采集计算的。
再细看,这些统计数据可以分为两种。
表级别的统计数据,如行数、大小等,
列级别的统计数据,如最大最小值等。
在主流数据库,包括我们重点关注的 Hive 和 Spark 里,都提供了 analyze
命令给我们主动采集这些数据。
比如这样一张 Hive 表的分区,起初 numRows
和 rawDataSize
的值都是 -1,也就是未知。
我们执行下 analyze table [table_name] partition([partition_expr]) compute statistics;
,再 describe 看下:
numRows
和 rawDataSize
就都有了真实值了。
而上面在定义 Filter 操作的代价时,Number of Rows = Filter Selectivity * Number of Rows from Child
,我们遇到了一个新的叫做 Selectivity
的参数。
Selectivity
是指过滤的比例,显然需要列级别的数据分布才能估算。
通常会采用上图这样的等高(equi-height)图,来记录数据的分布。因为等高分布图比线性分布图更符合实际情况,相对于等宽(equi-width)图,也能更好的避免数据倾斜带来的估算偏差。
类似数据分布这种 histogram 图,就需要先采集列级别的统计数据,只需要在刚才采集表级统计数据的 analyze
命令后面加上 for columns
就行了。
这样,通过这两个不同级别的 analyze 命令,就能把 CBO 需要的统计数据提前采集好,并保存到元数据库中。等到任务执行的时候,就直接作为代价计算的依据。
但是也很显然,这些统计数据是需要消耗资源计算的,比如在 Hive 里,就会执行 MR 任务去算。
所以就需要我们明智地有选择地执行:
按需决定范围,很少会用的表或列,可以不用算,
更新统计数据的频率,不需要比数据本身更新地更频繁,
尽量在业务低峰期算,以免影响业务应用。
见识下 CBO 的威力
上面说这么多,都不够接地气。到底 CBO 效果有没有那么神奇,需要实践检验。
前面我们说过, Join Order 是 RBO 无能为力的,需要 CBO 出马。我们就以这个例子来说明 CBO 的威力。
首先,从前面文章对 join 原理的介绍,我们能知道,多表 join 的时候,哪个在前,哪个在后,对性能的影响是巨大的。
以三张表 A join B join C 为例,各表行数和 join 结果如下图所示:
经过 CBO 优化后,会对 join 顺序做调整:
可以看到,调整顺序后,先 A join C,使得中间结果相比默认 A join B 的方式,从 2.5 billion 行下降到 500 million 行。这样,和第三张表的 join 数据量就减少了 80%,整个查询的性能也就大幅提升了。
这个优化,是 RBO 无能为力的,你无法总结出一条规则,来决定哪个表应该放在前面。
而这正是 CBO 的特长,手握每张表的统计数据,就能算出不同 join order 的实际开销,再从中选出代价最小的计划执行。
RBO 和 CBO 的关系
这两篇文章看下来,似乎 CBO 比 RBO 要好多了,更加量化,更加灵活。
Oracle 甚至明确在 RBO 的文档中说到:
Oracle Corporation strongly advises the use of cost-based optimization. Rule-based optimization will be deprecated in a future release.
并且在 10g 版本后,真的就把 RBO 移除了。
但实际上,RBO 和 CBO 并不是替换的关系。
看一下 Spark Catalyst 的架构图:
很明显,Spark 是先做完 RBO,然后再做 CBO。
这也很好理解。
一方面,RBO 虽然有不足,但也有很多非常确定和有效的规则,比如 Predicate Pushdown,不用多可惜,
另一方面,CBO 是有前提和代价的,如果没有足够的统计数据支撑,CBO 就不能发挥足够的作用,这个时候有 RBO 托住性能底线,就很有必要了。
所以 RBO 和 CBO 就成了这样共存的关系,各司其职,一起保障 SQL 查询性能。
TL;DR
这篇文章,主要介绍了为什么会有 CBO,以及 CBO 是怎么实现的。
基于经验归纳的 RBO,很难面面俱到,需要更加直接的性能优化方法,
通过对资源的抽象,新的性能优化方法应当关注 CPU、IO 的消耗,
有了指标,还需要确定计算公式,才能方便地比较消耗,或者叫代价,这种优化方法就叫 CBO,
一条 SQL 的代价可以从 AST 逐层累加起来,于是需要定义每个操作的代价,
Hive 和 Spark 都定义了自己的代价公式,可以看到,代价公式是逐渐演变的,需要平衡准确性和成本,没有完美的公式,
CBO 非常依赖数据统计分析,需要通过 analyze 命令事先采集统计数据,
RBO 和 CBO 并不是替代关系,而是在查询优化的不同阶段发挥自己的作用。
目前主流的数据库,包括传统关系数据库和分布式数据库,基本都以 CBO 作为 SQL 查询性能优化的主要手段。也确实大幅提高了整体性能,降低了使用成本。
但 CBO 重度依赖统计数据,而这些数据的采集计算是有成本的,也面临更新不及时的问题,使得 CBO 很难产生理想中的作用。
下一篇,我们就一起看下,有没有比 CBO 更加灵活的 SQL 查询性能优化方式。
关联阅读
原创不易
关注/分享/赞赏
给我坚持的动力