Hive数据仓库中历史拉链表实践

数据准备

  1. 在mysql中创建测试表orders
    CREATE TABLE `orders` (
      `orderid` int(11) NOT NULL AUTO_INCREMENT,
      `status` varchar(255) NOT NULL,
      `createtime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      `modifiedtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`orderid`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
    
    插入测试数据
    INSERT INTO `orders` VALUES ('1', '创建', '2019-03-21 10:19:53', '2019-03-21 10:20:33');
    INSERT INTO `orders` VALUES ('2', '创建', '2019-03-21 10:20:42', '2019-03-21 10:20:42');
    INSERT INTO `orders` VALUES ('3', '创建', '2019-03-21 10:20:55', '2019-03-21 10:20:55');
    INSERT INTO `orders` VALUES ('4', '创建', '2019-03-21 10:21:00', '2019-03-21 10:21:00');
    INSERT INTO `orders` VALUES ('5', '创建', '2019-03-21 10:21:05', '2019-03-21 10:21:05');
    
    Hive数据仓库中历史拉链表实践
  2. 创建hive表
    1. 在数据仓库的ODS层,创建增量表,按天分区,存放每天的增量数据
      beeline -u jdbc:hive2://192.168.1.101:10000 -n hive -e  "
      	CREATE TABLE ods_orders_inc (
      	    orderid INT,
      	    createtime STRING,
      	    modifiedtime STRING,
      	    status STRING
          ) PARTITIONED BY (day STRING) 
          row format delimited fields terminated by '\t' 
          stored AS textfile;
      "
      
    2. 在数据仓库的DW层,创建历史数据拉链表,存放历史状态数据
      beeline -u jdbc:hive2://192.168.1.101:10000 -n hive -e  "
      	CREATE TABLE dw_orders_his (
      	    orderid INT,
      	    createtime STRING,
      	    modifiedtime STRING,
      	    status STRING,
      	    dw_start_date STRING,
      	    dw_end_date STRING
      	) 
      	row format delimited fields terminated by '\t' 
      	stored AS textfile;
      "
      

全量初始化

  1. 利用sqoop抽取全量数据到ODS增量表中
    sqoop import \
    --connect "jdbc:mysql://192.168.1.101:3306/testdb" \
    --username "root" \
    --password "123456" \
    --table "orders" \
    --split-by "orderid" \
    --columns "orderid,createtime,modifiedtime,status" \
    --where "modifiedtime < '2019-03-21'" \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-database default \
    --hive-table "ods_orders_inc" \
    --hive-partition-key "day" \
    --hive-partition-value '2019-03-20' \
    --null-string '\\N' \
    --null-non-string '\\N' \
    --hive-drop-import-delims \
    --m 1
    
  2. 查看ods_orders_inc表数据 Hive数据仓库中历史拉链表实践
  3. 将ODS数据刷新到DW中
    beeline -u jdbc:hive2://192.168.1.101:10000 -n hive -e  "
    	INSERT overwrite TABLE dw_orders_his
    	SELECT orderid,
    		   status,
    	       createtime,
    	       modifiedtime,
    	       to_date(createtime) AS dw_start_date,
    	       '9999-12-31' AS dw_end_date
    	FROM ods_orders_inc
    	WHERE DAY = '2019-03-20';
    "
    
    Hive数据仓库中历史拉链表实践

增量抽取

  1. 每天从业务表中,将前一天的增量数据抽取到ODS层的增量表中

    sqoop import \
    --connect "jdbc:mysql://192.168.1.101:3306/hft" \
    --username "root" \
    --password "123456" \
    --table "orders" \
    --split-by "orderid" \
    --columns "orderid,createtime,modifiedtime,status" \
    --where "(createtime  >= '2019-03-21' and createtime < '2019-03-22') OR (modifiedtime >= '2019-03-21' and modifiedtime < '2019-03-22')"  \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-database default \
    --hive-table "ods_orders_inc" \
    --hive-partition-key "day" \
    --hive-partition-value '2019-03-21' \
    --null-string '\\N' \
    --null-non-string '\\N' \
    --hive-drop-import-delims \
    --m 1
    

    注意

    1. 增量需要通过业务表中的创建时间和修改时间来确定
    2. 在ODS层按天分区的增量表,最好保留一段时间的数据。比如半年,为了防止某一天的数据有问题而回滚重做数据。
      Hive数据仓库中历史拉链表实践
  2. 增量刷新历史数据
    先把数据放到一张临时表中

    beeline -u jdbc:hive2://192.168.1.101:10000 -n hive -e  "
    DROP TABLE IF EXISTS dw_orders_his_tmp;	
    CREATE TABLE dw_orders_his_tmp AS
    SELECT orderid,
           createtime,
           modifiedtime,
           status,
           dw_start_date,
           dw_end_date
    FROM
      (SELECT a.orderid,
              a.createtime,
              a.modifiedtime,
              a.status,
              a.dw_start_date,
              CASE
                  WHEN b.orderid IS NOT NULL
                       AND a.dw_end_date > '2019-03-21' THEN '2019-03-20'
                  ELSE a.dw_end_date
              END AS dw_end_date
       FROM dw_orders_his a
       LEFT OUTER JOIN
         (SELECT *
          FROM ods_orders_inc
          WHERE DAY = '2019-03-21') b ON (a.orderid = b.orderid)
       UNION ALL SELECT orderid,
                        createtime,
                        modifiedtime,
                        status,
                        to_date(modifiedtime) AS dw_start_date,
                        '9999-12-31' AS dw_end_date
       FROM ods_orders_inc
       WHERE DAY = '2019-03-21' ) x
    ORDER BY orderid,
             dw_start_date;
    "
    

    要点

    UNION ALL的两个结果集中,
    第一个是用历史拉链表left outer join 日期为2019-03-21的增量,能关联上并且dw_end_date > ‘2019-03-21’,说明状态有变化,则把原来的dw_end_date置为2019-03-20,
    关联不上的,说明状态无变化,dw_end_date则不变。
    第二个结果集是直接将日期为2019-03-21的增量数据插入历史拉链表。

  3. 把临时表中数据插入DW层历史拉链表中

    beeline -u jdbc:hive2://192.168.1.101:10000 -n hive -e  "
    	INSERT overwrite TABLE t_dw_orders_his
    	SELECT *
    	FROM t_dw_orders_his_tmp;
    "
    

    Hive数据仓库中历史拉链表实践