Hive(十) Hive 优化
Hive 优化
关闭 hive 服务器和客户端,node3:hive --service metastore
Node4:hive
1 Fetch 抓取
set hive.fetch.task.conversion=none/more(默认值);
默认做了优化,以下两种情况都不经过 mr,改为 none 后,将走 mr。
以下 SQL 不会转为 Mapreduce 来执行
1. select 仅查询本表字段
select id,name from person;
2. where 仅对本表字段做条件过滤
select id,name,age from person where age =32;
Sql 分析:
explain select * from person; explain select count(*) from person; explain extended select count(*) from person;(了解) |
2 本地运行模式
set hive.exec.mode.local.auto;#默认 false hive> select count(*) from person; Time taken: 24.026 seconds, Fetched: 1 row(s) hive>set hive.exec.mode.local.auto=true;#开启本地模式 hive>select count(*) from person; Time taken: 3.747 seconds, Fetched: 1 row(s) |
开发和测试阶段使用本地模式,优点快,缺点是 http://node3:8088/cluster 看不到。对于小数据集 hive 通过本地模式在单机上处理任务,执行时间可以明显被缩短。设置 local mr 的最大输入数据量,当输入数据量小于这个值是采用 local mr 的方式,
默认 134217728 也就是 128M。若大于该配置仍会以集群方式来运行!
hive.exec.mode.local.auto.inputbytes.max=134217728
设置 local mr 的最大输入文件个数,当输入文件个数小于这个值是采用 local mr 方式
hive.exec.mode.local.auto.input.files.max=4 #默认 4
3 并行模式:
hive> set hive.exec.parallel; #默认为 false hive>select t1.ct,t2.ct from (select count(id) ct from person) t1,(select count(name) ct from person) t2; 看执行过程,Launching Job 1 out of 5 执行完,才执行 Launching Job 2 out of 5 hive>set hive.exec.parallel=true; hive>select t1.ct,t2.ct from (select count(id) ct from person) t1,(select count(name) ct from person) t2 Launching Job 1 out of 5 和 Launching Job 2 out of 5 并行执行。但时间有 可能并没有减少,因为需要两套资源,目前还是使用一套。资源充足的情况下,肯定并行更快。 hive> set hive.exec.parallel.thread.number; hive.exec.parallel.thread.number=8 并行进程默认是 8 个进程同时进行。 |
4 严格与非严格模式
- 参数设置
hive> set hive.mapred.mode=strict; hive> set hive.mapred.mode=nostrict |
- 查询限制:
1、对于分区表,必须添加 where 对于分区字段的条件过滤;
hive> select * from person5; FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "person5" Table "person5" hive> select * from person5 where age=10; #可以查询 |
2、order by 语句必须包含 limit 输出限制;
hive> select * from person order by id desc; FAILED: SemanticException 1:30 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'id' hive> select * from person order by id desc limit 10; 使用 order by 语句必须使用 limit 语句。 |
3、限制执行笛卡尔积的查询。
(1) 严格模式下避免出现笛卡尔积。
5 Hive 排序
- order by - 对于查询结果做全排序,只允许有一个 reduce 处理
(当数据量较大时,应慎用。严格模式下,必须结合 limit 来使用)
- sort by - 对于单个 reduce 的数据进行排序
- distribute by - 分区排序,经常和 Sort By 结合使用
- cluster by - 相当于 sort By + distribute By
(Cluster By 不能通过 asc、desc 的方式指定排序规则;可通过 distribute by column sort by column asc|desc 的方式)
select * from person sort by id desc; |
6 分区剪裁、列裁剪
- 尽可能早的过滤掉尽可能多的数据,避免大量数据流入外层 sql
- 分区剪裁
- 分区在 hive 上本质是目录,分区剪裁可以高效的过滤掉大部分数据。
- 尽量使用分区剪裁
- 列裁剪
- 只获取需要的列的数据,减少数据输入。
- 少用 select *
select id,name From (select id,name from person) tmp
7 Hive - JVM 重用
适用场景:
1、小文件个数过多
2、task 个数过多
通过 set mapred.job.reuse.jvm.num.tasks=n; 来设置
(n 为 task 插槽个数)默认为 1。
优点:JVM 重用使得 JVM 实例在同一个 Job 中重新使用多次,减少进程启动和销毁时间和频繁申请资源的系统开销。
缺点:设置开启之后,task 插槽会一直占用资源,不论是否有 task 运行,直到所有的 task即整个 job 全部执行完成时,才会释放所有的 task 插槽资源!
8 推测执行
根据一定的法则推测出“拖后腿”的任务,并未这样的任务启动一个备份任务,让该任务和原始任务同时处理“同一份”数据,并最终选择最先成功完成任务的计算结果作为最终结果。
set hive.mapred.reduce.tasks.speculative.execution=true;#开启推测
9 表优化
9.1 小表与大表 join
Hive Join
官网搜索:join->LanguageManual Joins
join_table: table_reference [INNER] JOIN table_factor [join_condition] | table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition | table_reference LEFT SEMI JOIN table_reference join_condition | table_reference CROSS JOIN table_reference [join_condition] (as of Hive 0.10) table_reference: table_factor | join_table table_factor: tbl_name [alias] | table_subquery alias | ( table_references ) join_condition: ON expression |
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key= b.key1)
is converted into a single map/reduce job as only key1 column for b is involved in the join.
On the other hand
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key= b.key2)
is converted into two map/reduce jobs because key1 column from b is used in the first join condition and key2 column from b is used in the second one. The first map/reduce job joins a with b and the results are then joined with c in the second map/reduce job.
https://blog.****.net/sofuzi/article/details/81265402
reduce e side join
reduce side join 是一种最简单的 join 方式,其主要思想如下:
在 map 阶段,map 函数同时读取两个文件 File1 和 File2,为了区分两种来源的 key/value 数据对,对每条数据打一个标签> (tag),比如:tag=1表示来自文件 File1,tag=2 表示来自文件 File2。即:map 阶段的主要任务是对不同文件中的数据打标签。
> 在 reduce 阶段,reduce 函数获取 key相同的来自 File1 和 File2 文件的 value list,然后对于同一个key,对File1和File2中的数据进行join (笛卡尔乘积)。即:reduce 阶段进行实际的连接操作。
map e side join
之所以存在 reduce side join,是因为在 map 阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join 是非常低效的,因为 shuffle 阶段要进行大量的数据传输。Map sidejoin 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多 份,让每个 map task 内存中存在一份(比如存放到 hash table 中),然后只扫描大表:对于大表中的每一条记录 key/value,在
hash table 中查找是否有相同的 key 的记录,如果有,则连接后输出即可。
Join 计算时,将小表(驱动表)放在 join 的左边
Map Join:在 Map 端完成 Join
两种实现方式:
1、SQL 方式,在 SQL 语句中添加 MapJoin 标记(mapjoin hint)
语法:
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key; |
2、开启自动的 MapJoin
大小判断标准,表数据的大小而不是行数。(通常行少的数据一般是小表)hive.ignore.mapjoin.hint;默认为 true,如果自动和手动冲突了,手动的配置失效,以自动配置为准。
将 key 相对分散,并数据量小的表放在 join 的左边,这样可以有效减少内存溢出发生的几率;还可以使用 map join 让小的维度表(1000 条一下的记录数据)先进内存,在map 端完成 reduce。
测试发现新版的 hive 已经对小表 join 大表和大表 join 小表进行了优化,小表放在左边和右边已经没有明显的区别。
map join 原理分析:
在 Map 端完成 Join。如果不指定 MapJoin 或者不符合 MapJoin 的条件,那么 Hive解析器会将 Join 操作转换成 CommonJoin,在 reduce 阶段完成 join。容易发生倾斜。可以用 MapJoin 将小表全部加装到内存,在 map 端完成 join,避免 reducer 处理。
两种实现方式:
1、开启自动的 MapJoin
通过修改以下配置启用自动的 mapjoin:
set hive.auto.convert.join = true;
(该参数为 true 时,Hive 自动对左边的表统计量,如果是小表就加入内存,即对小表使用 Map join)
2、SQL 方式,在 SQL 语句中添加 MapJoin 标记(mapjoin hint)
语法:
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key; |
相关配置参数:
1. hive.mapjoin.smalltable.filesize; #默认 25M
(大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行)
2. hive.ignore.mapjoin.hint;
(默认值:true;是否忽略 mapjoin hint 即 mapjoin 标记)
3. hive.auto.convert.join.noconditionaltask;
(默认值:true;将普通的 join 转化为普通的 mapjoin 时,是否将多个 mapjoin 转化为一个 mapjoin)
4. hive.auto.convert.join.noconditionaltask.size;
(将多个 mapjoin 转化为一个 mapjoin 时,其表的最大值)
hive.groupby.skewindata 避免数据倾斜,一个 MR 变 2 个:MR->MR,先做一次数据合并,有时候一个 MR 都即将 100%的时候卡着不动了,分两个完成。
9.2 大表 join 大表
1. 空 key 过滤:
(1) 有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。
(2) 一般在 ETL 数据清洗时便会对空值进行了处理,所以该条一般情况下意义不大,但是面试的时候可以说。最终目的是 reduce 均衡,防止数据倾斜。
2. 空 key 转换:
(1) 有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上
9.3 Map-Side 聚合
- 默认情况下,Map 阶段相同 key 分送到一个 reduce,当某个 key 的数据过大时就会发生数据倾斜。
- 并不是所有的聚合都需要再 reduce 端完成,可以先在 map 端进行聚合,最后再在reduce 端聚合。如同 combiner
- 通过设置以下参数开启在 Map 端的聚合
set hive.map.aggr=true;
|
9.4 count(distinct)去重统计
数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换
每个 reduce 任务处理的数据量,默认值 256MB set hive.exec.reducers.bytes.per.reducer=256000000 Select count(distinct imei) from jizhan; 转换为 Select count(imei) from (select imei from jizhan group by imei) tmp |
虽然会多一个 Job 来完成,但在数据量大的情况下,这个绝对值得。
9.5 笛卡尔积
尽量避免笛卡尔积,即避免 join 的时候不加 on 条件,或者无效的 on 条件。Hive 只使用1 个 reduce 来完成笛卡尔积,所以效率特别低。
10 合适设置 Map 与 Reduce 数量
10.1 合理设置 Map Task 数量
通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。
主要决定因素:input 的文件总个数,input 的文件大小,集群设置的文件块大小。block 块,split_size,文件个数 split 切片数量决定 map task 数量
是不是 map 数越多越好?
答:不是。如果一个任务有很多小文件,则每个小文件都会被当成一个 split切片,用一个 map 任务来完成,执行真是业务逻辑运算的时间远远小于 map 任务的启动和初始化的时间,就会造成很大的资源浪费。另外,同时可执行的 map 数也是受限
的。如何优化,答案当然是减少 map 的数量,比如通过合并小文件减少 map 数量,见10.2。
是不是保证每个 map 处理接近 128M 的文件块,就高枕无忧了?
答:不一定,比如一个 128MB(或者接近该值)的文件,默认情况会用一个map 去完成,但是这个文件可能只有很少的小字段,却又几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定比较耗时。如何解决?当然是增加 map
的个数。见 4.10.3。
10.2 合并小文件
小文件数目多,容易在文件存储端造成压力,给 hdfs 造成压力,影响效率设置合并属性
是否合并 map 输出文件:hive.merge.mapfiles=true
是否合并 reduce 输出文件:hive.merge.mapredfiles=true;
合并文件的大小:hive.merge.size.per.task=256*1000*1000
CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat |
10.3 复杂文件增加 map 数
对比设置 split 逻辑切块的大小(minSize,maxSize)
set mapred.max.split.size=256000000(默认值)
splitSize = Math.max(minSize,Math.min(maxSize,blockSize))
一个 split 的最大值,即每个 map 处理文件的最大值。让该值小于 blocksize 就可以增加 map 的个数。
注意:set mapred.min.split.size=1;(默认值)
扩展 Map 数量相关的参数
mapred.min.split.size.per.node
一个节点上 split 的最小值
mapred.min.split.size.per.rack
一个机架上 split 的最小值
10.4 合理设置 Reduce 数
1. 方式一:强制指定 reduce 任务的数量
set mapred.reduce.tasks=5 |
2. 方式二:
(1) 设置每个 reduce 处理的数据量默认值约等于 256M
set hive.exec.reducers.bytes.per.reducer=256000000; |
(2) 每个任务最大的 reduce 数,默认 1009。
set hive.exec.reducers.max=1009; |
(3) 计算 reduce 数 2560 * 1024 *1024
num = min(hive.exec.reducers.max, 总输入数据量/hive.exec.reducers.bytes.per.reducer) |
总结:reduce 个数并不是越多越好,过多的启动和初始化 reduce 也会消耗时间和资源;
另外过多的 reduce 会生成很多个结果文件,同样产生了小文件的问题。