hive原理及执行任务流程优化

  • Hive架构
    • hive原理及执行任务流程优化

    • hive原理及执行任务流程优化

  • Hive主要有QL,MetaStore和Serde三大核心组件构成。
    • QL:是编译器也是Hive中最核心的部分。
      • Driver模块的工作是将HQL语句转化为MapReduce调用.
      • 包括主要的三个阶段:
      • 编译:Compile,生成执行计划
      • 优化:Optimize,优化执行计划(当前的Hive实现是在执行前做一次唯一的优化,没有反馈的过程,这使得优化工作只能是rule-based,做不到cost-based)。
      • 执行:Execute,将执行计划提交给Hadoop。
    • Serde:是Serializer和Deserializer的缩写,用于序列化和反序列化数据,即读写数据。
    • MetaStore :对外暴露Thrift API,用于元数据的修改。比如表的增删改查,分区的增删改查,表的属性的修改,分区的属性的修改等。
  • Hive的数据模型
    • hive原理及执行任务流程优化

    • Hive的数据存储在HDFS上,基本存储单位是表或者分区。
    • Hive内部把表或者分区称作SD,即Storage Descriptor。一个SD通常是一个HDFS路径,或者其它文件系统路径。SD的元数据信息存储在Hive MetaStore中,如文件路径,文件格式,列,数据类型,分隔符。Hive默认的分格符有三种,分别是^A、^B和^C,即ASCii码的1、2和3,分别用于分隔列,分隔列中的数组元素,和元素Key-Value对中的Key和Value。
    • MetaStore:对外Thrift API,用于元数据的修改
    • Hive的核心是Driver,Driver的核心是SemanticAnalyzer。 Hive实际上是一个SQL到Hadoop作业的编译器。
    • Hive目前支持MapReduce, Tez, Spark执行引擎
  • Hive的编译流程:
    • hive原理及执行任务流程优化

    • hive原理及执行任务流程优化

    • (1)ParseDriver内部词法分析,生成AST(抽象语法树)。
    • (2)SemanticAnalyzer分析AST树,生成 QB查询块(QueryBlock)。
    • (3)从metastore中获取表的信息,SemanticAnalyzer.getMetaData完成。
    • (4)生成逻辑执行计划,SemanticAnalyzer.genPlan完成。
    • (5)优化逻辑执行计划,Optimizer完成,ParseContext作为上下文信息进行传递。
    • (6)生成物理执行计划,SemanticAnalyzer.genMapRedTasks完成。
    • (7)物理计划优化,PhysicalOptimizer完成,PhysicalContext作为上下文信息进行传递。
    • (8)执行生成的物理计划,获得结果。
    • (1)~(7)在Driver的compile中完成,(8)在Driver的execute中完成,在执行阶段一个一个Task运行,不会改变物理计划。
  •  
  • Group By的执行任务:
  • 例如一条SQL语句:
    • INSERT INTO TABLE pageid_age_sum
    • SELECT pageid, age, count(1)
    • FROM pv_users
    • GROUP BY pageid, age;
    • hive原理及执行任务流程优化

    • 将每个网页的阅读数按年龄进行分组统计。MapReduce就是一个Group By的过程,这个SQL翻译成MapReduce就是相对简单的。
    • hive原理及执行任务流程优化

    • 首先在Map端,每一个Map读取一部分表的数据,通常是64M或者256M,然后按需要Group By的Key分发到Reduce端。经过Shuffle Sort,每一个Key再在Reduce端进行聚合(这里是Count),然后就输出了最终的结果。
  • distinct的执行任务:
    • Distinct在实现原理上与Group By类似。
    • 例如:SELECT pageid, COUNT(DISTINCT userid) FROM page_view GROUP BY pageid
      • hive原理及执行任务流程优化

    • Hive 实现成MapReduce的原理如下:
      • hive原理及执行任务流程优化

  • 也就是说Map分发到Reduce的时候,会使用pageid和userid作为联合分发键,再去聚合(Count),输出结果。
  • 从原理上可以看出,当遇到Group By的查询时,会按Group By 键进行分发?如果键很多,很可能会撑爆了机器硬盘,Impala或Spark,为了快,key在内存中,内存爆是经常的。
  • join的执行任务:
    • 例如一条SQL语句:
      • INSERT INTO TABLE pv_users
      • SELECT pv.pageid, u.age
      • FROM page_view pv JOIN user u ON (pv.userid = u.userid);
    • hive原理及执行任务流程优化

      • 把访问和用户表进行关联,生成访问用户表,Hive的Join也是通过MapReduce来完成的。
      • hive原理及执行任务流程优化

      • 上面的查询,在MapReduce的Join的实现过程如下:

        hive原理及执行任务流程优化

    • Map端会分别读入各个表的一部分数据,把这部分数据进行打标,如pv表打标1,user表打标2.
    • Map读取是分布式进行的,标完标记后分发到Reduce端,Reduce 端根据Join Key,也就是关联键进行分组。然后按打的标记进行排序,如图上的Shuffle Sort。
    • 在每一个Reduce分组中,相同的Key为111的在一起,也就是一台机器上。同时,pv表的数据在这台机器的上端,user表的数据在这台机器的下端。
    • 这时,Reduce把pv表的数据读入到内存里,然后逐条与硬盘上user表的数据做Join就可以了。
    • 从这个实现可以看出,我们在写Hive Join的时候,应该尽可能把小表(分布均匀的表)写在左边,大表(或倾斜表)写在右边。这样可以有效利用内存和硬盘的关系,提高Hive的处理能力。
    • 同时由于使用Join Key进行分发, Hive也只支持等值Join,不支持非等值Join。由于Join和Group By一样存在分发,所以也同样存在着倾斜的问题,所以Join也要对抗倾斜数据,提升查询执行性能。
  • Map join的执行任务:
    • 有一种执行非常快的Join叫Map Join 。
    • 手动的Map Join SQL如下(pv是小表):
      • INSERT INTO TABLE pv_users
      • SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
      • FROM page_view pv JOIN user u
      • ON (pv.userid = u.userid);
    • 同上边例子,用Map Join执行
    • hive原理及执行任务流程优化

    • Map Join通常只适用于一个大表和一个小表做关联的场景,例如事实表和维表的关联。
    • 原理如上图,用户可以手动指定哪个表是小表,然后在客户端把小表打成一个哈希表序列化文件的压缩包,通过分布式缓存均匀分发到作业执行的每一个结点上。然后在结点上进行解压,在内存中完成关联。
    • Map Join全过程不会使用Reduce,非常均匀,不会存在数据倾斜问题。默认情况下,小表不应该超过25M。在实际使用过程中,手动判断是不是应该用Map Join太麻烦了,而且小表可能来自于子查询的结果。
    • Hive有一种稍微复杂一点的机制,叫Auto Map Join
    • hive原理及执行任务流程优化

    • 图上左边是优化前的,右边是优化后的,Hive原理中提到的物理优化器(Physical Optimizer)它其中一个功能就是把Join优化成Auto Map Join
    • 优化过程是把Join作业前面加上一个条件选择器ConditionalTask和一个分支。左边的分支是MapJoin,右边的分支是Common Join(Reduce Join)
      • 这个时候,我们在执行的时候,就由这个Conditional Task 进行实时路径选择,遇到小于25M走左边,大于25M走右边。
    • 在比较新版的Hive中,Auto Mapjoin是默认开启的。如果没有开启,可以使用一个开关, set hive.auto.convert.join=true开启。
    • 当然,Join也会遇到和上面的Group By一样的倾斜问题。