Hive 优化而非

3.5 SQL优化

3.5.1 列裁剪

  Hive在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他列。例如,若有以下查询:

SELECT a,b FROM q WHERE e<10;

  在实施此项查询中,Q表有5列(a,b,c,d,e),Hive只读取查询逻辑中真实需要的3列a、b、e, 而忽略列c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。

  裁剪对应的参数项为:hive.optimize.cp=true(默认值为真)

3.5.2 分区裁剪

  可以在查询的过程中减少不必要的分区。例如,若有以下查询:

SELECT * FROM (SELECT a1, COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; # (多余分区)
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;

  查询语句若将"subq.prtn=100"条件放入子查询中更为高效,可以减少读入的分区数目。Hive自动执行这种裁剪优化。

  分区参数为:hive.optimize.pruner=true(默认值为真)

3.5.3 熟练使用SQL提高查询

  熟练地使用SQL,能写出高效率的查询语句。

  场景:有一张user表,为卖家每天收到表,user_id,ds(日期)为key,属性有主营类目,指标有交易金额,交易笔数。每天要取前10天的总收入,总笔数,和最近一天的主营类目。

  解决方法 1 如下所示:常用方法

INSERT OVERWRITE TABLE t1
SELECT user_id, substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users
WHERE ds=20120329 // 20120329 为日期列的值,实际代码中可以用函数表示当天日期GROUP BY user_id;

INSERT OVERWRITE TABLE t2
SELECT user_id,sum(qty) AS qty, SUM(amt) AS amt FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id;

SELECT t1.user_id, t1.main_cat, t2.qty, t2.amt FROM t1
JOIN t2 ON t1.user_id=t2.user_id

  下面给出方法1的思路,实现步骤如下:

  第一步:利用分析函数,取每个user_id最近一天的主营类目,存入临时表t1;

  第二步:汇总10天的总交易金额,交易笔数,存入临时表t2;

  第三步:关联t1、t2,得到最终的结果。

  解决方法 2 如下所示:优化方法

SELECT user_id, substr(MAX(CONCAT(ds, cat)), 9) AS main_cat, SUM(qty), SUM(amt) FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id

  在工作中我们总结出:方案2的开销等于方案1的第二步开销,性能提升,由原有的25分钟完成,缩短为10分钟以内完成。节省了两个临时表的读写是一个关键原因,这种方式也适用于Oracle中的数据查找工作。

  SQL具有普适性,很多SQL通用的优化方案在Hadoop分布式计算方式中也可以达到效果。

3.5.5 不同数据类型关联产生的倾斜问题

  问题:不同数据类型id的关联会产生数据倾斜问题。

  一张表的s8_log的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8_log的日志中有32位字符串商品id,也有数值商品id,日志中类型是string的,但商品中的数值id是bigint的。猜想问题的原因是把s8_log的商品id转成数值id做hash来分配Reduce,所以字符串id的s8_log日志,都到一个Reduce上了,解决的方法验证了这个猜测。

  解决方法:把数据类型转换成字符串类型

SELECT * FROM s8_log a LEFT OUTER JOIN r_auction_auctions b ON a.auction_id=CAST(b.auction_id AS STRING)

  调优结果显示:数据表处理由1小时30分钟经代码调整后可以在20分钟内完成。

3.5.6 利用Hive对UNION ALL优化的特性

  多表union all会优化成一个job。

  问题:比如推广效果表要和商品表关联,效果表中的auction_id列既有32位字符串商品id,也有数字id,和商品表关联得到商品的信息。

  解决方法:Hive SQL性能会比较好

SELECT * FROM effect a
JOIN
(SELECT auction_id AS auction_id FROM auctions
UNION ALL
SELECT auction_string_id AS auction_id FROM auctions
) b
ON a.auction_id=b.auction_id

  比分别过滤数字id,字符串id然后分别和商品表关联性能要好。

  这样写的好处:1个MapReduce作业,商品表只读一次,推广效果表只读取一次。把这个SQL换成Map/Reduce代码的话,Map的时候,把a表的记录打上标签a,商品表记录每读取一条,打上标签b,变成两个<key, value>对,<(b,数字id),value>,<(b,字符串id),value>。

  所以商品表的HDFS读取只会是一次。

3.5.7 解决Hive对UNION ALL优化的短板

  Hive对union all的优化的特性:对union all优化只局限于非嵌套查询

  • 消灭子查询内的group by

  示例1:子查询内有group by

SELECT * FROM
(SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3

  从业务逻辑上说,子查询内的GROUP BY怎么看都是多余(功能上的多余,除非有COUNT(DISTINCT)),如果不是因为Hive Bug或者性能上的考量(曾经出现如果不执行子查询GROUP BY,数据得不到正确的结果的Hive Bug)。所以这个Hive按经验转换成如下所示:

SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2) t3 GROUP BY c1,c2,c3

  调优结果:经过测试,并未出现union all的Hive Bug,数据是一致的。MapReduce的作业数由3减少到1。

  t1相当于一个目录,t2相当于一个目录,对Map/Reduce程序来说,t1、t2可以作为Map/Reduce作业的mutli inputs。这可以通过一个Map/Reduce来解决这个问题。Hadoop的计算框架,不怕数据多,就怕作业数多。

  但如果换成是其他计算平台如Oracle,那就不一定了,因为把大输入拆成两个输入,分别排序汇总成merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排序比冒泡排序的性能更优)。

  • 消灭子查询内的COUNT(DISTINCT),MAX,MIN
SELECT * FROM 
(SELECT * FROM t1
UNION ALL SELECT c1,c2,c3 count(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3

  由于子查询里头有COUNT(DISTINCT)操作,直接去GROUP BY将达不到业务目标。这时采用临时表消灭COUNT(DISTINCT)作业不但能解决倾斜问题,还能有效减少jobs。

INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
(SELECT c1,c2,c3,income,0 AS uv FROM t1
UNION ALL
SELECT c1,c2,c3,0 AS income, 1 AS uv FROM t2) t3
GROUP BY c1,c2,c3;

  job数是2,减少一半,而且两次Map/Reduce比COUNT(DISTINCT)效率更高。

  调优结果:千万级别的类目表,member表,与10亿级的商品表关联。原先1963s的任务经过调整,1152s即完成。

  • 消灭子查询内的JOIN  
SELECT * FROM
(SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x
GROUP BY c1,c2;

  上面代码运行会有5个jobs。加入先JOIN生存临时表的话t5,然后UNION ALL,会变成2个jobs。

INSERT OVERWRITE TABLE t5
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);

  调优结果显示:针对千万级别的广告位表,由原先5个Job共15分钟,分解为2个job,一个8-10分钟,一个3分钟。

3.5.8 COUNT(DISTINCT)

  计算uv的时候,经常会用到COUNT(DISTINCT),但在数据比较倾斜的时候COUNT(DISTINCT)会比较慢。这时可以尝试用GROUP BY改写代码计算uv。数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:

  • 原有代码
①
INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329) 
SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid;

②
select count(distinct id) from bigtable;

  关于COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:

  测试数据:169857条

Hive 优化而非

①
#统计每日IP
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS FROM logdfs WHERE logdate='2014_12_29';
耗时:24.805 seconds
#统计每日IP(改造)
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp;
耗时:46.833 seconds

②
 select count(id) from (select id from bigtable group by id) a;

Hive 优化而非

测试结果表明:明显改造后的语句比之前耗时,这时因为改造后的语句有2个SELECT,多了一个job,这样在数据量小的时候,数据不会存在倾斜问题。

3.5.9 JOIN操作

3.5.9.1 小表、大表JOIN

  在使用写有Join操作的查询语句时有一条原则:应该将条目少的表/子查询放在Join操作符的左边。原因是在Join操作的Reduce阶段,位于Join操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生OOM错误的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

  实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。

  案例实操:

  (1)关闭mapjoin功能(默认是打开的)

    set hive.auto.convert.join=false;

  (2)执行小表JOIN大表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable  b
on b.id = s.id;

  Time taken: 35.921 seconds

  (3)执行大表JOIN大表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable  b
left join smalltable  s
on s.id = b.id;

  Time taken: 34.196 seconds;

3.5.9.2 大表JOIN大表

1)空Key过滤

  问题:日志中常会出现信息丢失,比如每日约为20亿的全网日志,其中的user_id为主键,在日志收集过程中会丢失,出现主键为null的情况,如果取其中的user_id和bmw_users关联,就会碰到数据倾斜的问题。原因是Hive中,主键为null值的项会被当做相同的Key而分配进同一个计算Map。

  解决方法1:user_id为空的不参与关联,子查询过滤null

SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL

  解决方法2 如下所示:函数过滤null

SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive', RAND()) ELSE a.user_id END = b.user_id;

  调优结果:原先由于数据倾斜导致运行时长超过1小时,解决方法1运行每日平均时长25分钟,解决方法2运行的每日平均时长在20分钟左右。优化效果很明显。

  我们在工作中总结出:解决方法2比解决方法1效果更好,不但IO少了,而且作业数也少了。解决方法1中log读取两次,job数为2。解决方法2中job数是1。这个优化适合无效id(比如-99,‘’,null等)产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的Reduce上,从而解决数据倾斜问题。因为空值不参与关联,即使分到不同的Reduce上,也不会影响最终的结果。附上Hadoop通用关联的实现方法是:关联通过二次排序实现的,关联的列为partition key,关联的列和表的tag组成排序的group key,根据partition key分配Reduce。同一Reduce内根据group key排序。

3.5.9.3 MAP JOIN操作

  如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

  • 开启MapJoin参数设置:

    1) 设置自动选择MapJoin

      set hive.auto.convert.join = true;默认为true

    2) 大表小表的阀值设置(默认25M一下认为是小表):

      set hive.mapjoin.smalltable.filesize=25000000;

  • MapJoin工作机制

  Hive 优化而非

 

  上图是Hive MapJoin的原理图,从图中可以看出MapJoin分为两个阶段:

  (1)通过MapReduce Local Task,将小表读入内存,生成内存HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。

  (2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。也就是在map端进行join避免了shuffle。

  Join操作在Map阶段完成,不再需要Reduce,有多少个Map Task,就有多少个结果文件。

  实例:

  (1)开启MapJoin功能

    set hive.auto.convert.join = true; 默认为true

  (2)执行小表JOIN大表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable  b
on s.id = b.id;

  Time taken: 24.594 seconds

  (3)执行大表JOIN小表语句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable  b
join smalltable  s
on s.id = b.id;

  Time taken: 24.315 seconds

3.5.9.3 GROUP BY操作

  默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。进行GROUP BY操作时需要注意以下几点:

  • Map端部分聚合

  事实上并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。

  (1)开启Map端聚合参数设置

    set hive.map.aggr=true

  (2)在Map端进行聚合操作的条目数目

    set hive.grouby.mapaggr.checkinterval=100000

  (3)有数据倾斜的时候进行负载均衡(默认是false)

    set hive.groupby.skewindata = true

  • 有数据倾斜时进行负载均衡

  此处需要设定hive.groupby.skewindata,当选项设定为true时,生成的查询计划有两个MapReduce任务。在第一个MapReduce中,map的输出结果集合会随机分布到reduce中,每个reduce做部分聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到reduce中(这个过程可以保证相同的Group By Key分布到同一个reduce中),最后完成最终的聚合操作。

3.5.10 优化in/exists语句

  虽然经过测验,hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join

  比如说:    

      select a.id, a.name from a where a.id in (select b.id from b);
      select a.id, a.name from a where exists (select id from b where a.id = b.id);

  应该转换成:

      select a.id, a.name from a left semi join b on a.id = b.id;

3.5.11 排序选择

  • cluster by: 对同一字段分桶并排序,不能和sort by连用;
  • distribute by + sort by: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by 保证每个reduceTask结果有序;
  • sort by: 单机排序,单个reduce结果有序
  • order by:全局排序,缺陷是只能使用一个reduce