Flink如何连接hive
回顾
在上篇文章中,笔者使用的 CDH 版本为 5.16.2,其中 Hive 版本为 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代码本身对 Hive 1.1.0 版本兼容性不好,存在不少问题。为了兼容目前版本,笔者基于 CDH 5.16.2 环境,对 Flink 代码进行了修改,重新打包并部署。
其实经过很多开源项目的实战,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情况下,替换一些 Jar 包,是可以解决兼容性的问题。对于笔者的环境来说,可以使用 Hive 1.2.1 版本的一些 Jar 包来代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的开始部分,笔者会解决这个问题,然后再补充上篇文章缺少的实战内容。
剪不断理还乱的问题
根据读者的反馈,笔者将所有的问题总结为三类:
- Flink 如何连接 Hive 除了 API 外,有没有类似 spark-sql 命令
- 识别不到 Hadoop 环境或配置文件找不到
- 依赖包、类或方法找不到
1. Flink 如何连接 Hive
有的读者不太清楚,如何配置 Flink 连接 Hive 的 Catalog,这里补充一个完整的 conf/sql-client-hive.yaml 示例:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
catalogs: - name: staginghive type : hive hive-conf- dir : /etc/hive/conf hive-version: 1.2.1 execution: planner: blink type : batch time -characteristic: event- time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type : fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048 |
sql-client-hive.yaml 配置文件里面包含:
- Hive 配置文件 catalogs 中配置了 Hive 的配置文件路径。
- Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。
- 执行引擎信息 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比较稳定,适合传统的批处理作业,而且可以容错,另外中间数据落盘,建议开启压缩功能。除了 batch,Flink 也支持 streaming 模式。
- Flink SQL CLI 工具
类似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 脚本。在 Flink 1.10 版本中,Flink SQL CLI 改进了很多功能,笔者后面讲解。
sql-client.sh 使用方式如下:
1
|
$ bin /sql-client .sh embedded -d conf /sql-client-hive .yaml |
2. 识别不到 Hadoop 环境或配置文件找不到
笔者在上篇文章中提到过,在部署 Flink 的环境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外还需要配置一些环境变量,如下:
1
2
3
4
|
export HADOOP_CONF_DIR= /etc/hadoop/conf export YARN_CONF_DIR= /etc/hadoop/conf export HIVE_HOME= /opt/cloudera/parcels/CDH/lib/hive export HIVE_CONF_DIR= /etc/hive/conf |
3. 依赖包、类或方法找不到
先查看一下 Flink 家目录下的 lib 目录:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
$ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive- exec -1.1.0-cdh5.16.2.jar ├── hive-metastore-1.1.0-cdh5.16.2.jar ├── libfb303-0.9.3.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar |
如果上面前两个问题都解决后,执行如下命令:
1
|
$ bin /sql-client .sh embedded -d conf /sql-client-hive .yaml |
报错,报错,还是报错:
1
|
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory |
其实在运行 sql-client.sh 脚本前,需要指定 Hadoop 环境的依赖包的路径,建议不要报错一个添加一个,除非有的读者喜欢。这里笔者提示一个方便的方式,即设置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)环境变量:
1
|
export HADOOP_CLASSPATH=`hadoop classpath` |
再次执行:
1
|
$ bin /sql-client .sh embedded -d conf /sql-client-hive .yaml |
很抱歉,继续报错:
1
|
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway. local .ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway. local .LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client |
这里就是 Hive 1.1.0 版本的 Jar 包与 Flink 出现版本不兼容性的问题了,解决方法是:
- 下载 apache-hive-1.2.1 版本
- 替换 Flink lib 目录下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然后添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目录:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
$ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive- exec -1.2.1.jar ├── hive-metastore-1.2.1.jar ├── libfb303-0.9.2.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar |
最后再执行:
1
|
$ bin /sql-client .sh embedded -d conf /sql-client-hive .yaml |
这时,读者就可以看到手握栗子的可爱小松鼠了。
Flink SQL CLI 实践
在 Flink 1.10 版本(目前为 RC1 阶段) 中,Flink 社区对 SQL CLI 做了大量的改动,比如支持 View、支持更多的数据类型和 DDL 语句、支持分区读写、支持 INSERT OVERWRITE 等,实现了更多的 TableEnvironment API 的功能,更加方便用户使用。
接下来,笔者详细讲解 Flink SQL CLI。
0. Help
执行下面命令,登录 Flink SQL 客户端:
1
2
|
$ bin /sql-client .sh embedded -d conf /sql-client-hive .yaml Flink SQL> |
执行 HELP,查看 Flink SQL 支持的命令,如下为大部分常用的:
- CREATE TABLE
- DROP TABLE
- CREATE VIEW
- DESCRIBE
- DROP VIEW
- EXPLAIN
- INSERT INTO
- INSERT OVERWRITE
- SELECT
- SHOW FUNCTIONS
- USE CATALOG
- SHOW TABLES
- SHOW DATABASES
- SOURCE
- USE
- SHOW CATALOGS
1. Hive 操作
- 1.1 创建表和导入数据
为了方便读者进行实验,笔者使用 ssb-dbgen 生成测试数据,读者也可以使用测试环境已有的数据来进行实验。
具体如何在 Hive 中一键式创建表并插入数据,可以参考笔者早期的项目 https://github.com/MLikeWater/ssb-kylin
- 1.2 Hive 表
查看上个步骤中创建的 Hive 表:
01
02
03
04
05
06
07
08
09
10
11
|
0: jdbc:hive2: //xx .xxx.xxx.xxx:10000> show tables; +--------------+--+ | tab_name | +--------------+--+ | customer | | dates | | lineorder | | p_lineorder | | part | | supplier | +--------------+--+ |
读者可以对 Hive 进行各种查询,对比后面 Flink SQL 查询的结果。
2. Flink 操作
- 2.1 通过 HiveCatalog 访问 Hive 数据库
登录 Flink SQL CLI,并查询 catalogs:
1
2
3
4
5
6
|
$ bin /sql-client .sh embedded -d conf /sql-client-hive .yaml Flink SQL> show catalogs; default_catalog staginghive Flink SQL> use catalog staginghive; |
通过 show catalogs 获取配置的所有 catalog。由于笔者在 sql-client-hive.yaml 文件中设置了默认的 catalog,即为 staginghive。如果需要切换到其他 catalog,可以使用 usecatalog xxx。
- 2.2 查询 Hive 元数据
通过 Flink SQL 查询 Hive 数据库和表:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# 查询数据库 Flink SQL> show databases; ... ssb tmp ... Flink SQL> use ssb; # 查询表 Flink SQL> show tables; customer dates lineorder p_lineorder part supplier # 查询表结构 Flink SQL> DESCRIBE customer; root |-- c_custkey: INT |-- c_name: STRING |-- c_address: STRING |-- c_city: STRING |-- c_nation: STRING |-- c_region: STRING |-- c_phone: STRING |-- c_mktsegment: STRING |
这里需要注意,Hive 的元数据在 Flink catalog 中都以小写字母使用。
- 2.3 查询
接下来,在 Flink SQL CLI 中查询一些 SQL 语句,完整 SQL 参考 https://github.com/MLikeWater/ssb-kylin 的 README。
目前 Flink SQL 解析 Hive 视图元数据时,会遇到一些 Bug,比如执行 Q1.1 SQL:
1
2
3
4
5
6
7
8
9
|
Flink SQL> select sum (v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder ' not found; did you mean ' LINEORDER'? |
Flink SQL 找不到视图中的实体表。
p_lineorder 表是 Hive 中的一张视图,创建表的语句如下:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
CREATE VIEW P_LINEORDER AS SELECT LO_ORDERKEY, LO_LINENUMBER, LO_CUSTKEY, LO_PARTKEY, LO_SUPPKEY, LO_ORDERDATE, LO_ORDERPRIOTITY, LO_SHIPPRIOTITY, LO_QUANTITY, LO_EXTENDEDPRICE, LO_ORDTOTALPRICE, LO_DISCOUNT, LO_REVENUE, LO_SUPPLYCOST, LO_TAX, LO_COMMITDATE, LO_SHIPMODE, LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE FROM ssb.LINEORDER; 但是对于 Hive 中视图的定义,Flink SQL 并没有很好地处理元数据。为了后面 SQL 的顺利执行,这里笔者在 Hive 中删除并重建该视图: 0: jdbc:hive2: //xx .xxx.xxx.xxx:10000> create view p_lineorder as select lo_orderkey, lo_linenumber, lo_custkey, lo_partkey, lo_suppkey, lo_orderdate, lo_orderpriotity, lo_shippriotity, lo_quantity, lo_extendedprice, lo_ordtotalprice, lo_discount, lo_revenue, lo_supplycost, lo_tax, lo_commitdate, lo_shipmode, lo_extendedprice*lo_discount as v_revenue from ssb.lineorder; |
然后继续在 Flink SQL CLI 中查询 Q1.1 SQL:
1
2
3
4
5
6
7
8
9
|
Flink SQL> select sum (v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; revenue 894280292647 |
继续查询 Q2.1 SQL:
[Bash shell] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
Flink SQL> select sum (lo_revenue) as lo_revenue, d_year, p_brand > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join part on lo_partkey = p_partkey > left join supplier on lo_suppkey = s_suppkey > where p_category = 'MFGR#12' and s_region = 'AMERICA' > group by d_year, p_brand > order by d_year, p_brand; lo_revenue d_year p_brand 819634128 1998 MFGR #1206 877651232 1998 MFGR #1207 754489428 1998 MFGR #1208 816369488 1998 MFGR #1209 668482306 1998 MFGR #1210 660366608 1998 MFGR #1211 862902570 1998 MFGR #1212 ... |
最后再查询一个 Q4.3 SQL:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
Flink SQL> select d_year, s_city, p_brand, sum (lo_revenue) - sum (lo_supplycost) as profit > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join customer on lo_custkey = c_custkey > left join supplier on lo_suppkey = s_suppkey > left join part on lo_partkey = p_partkey > where c_region = 'AMERICA' and s_nation = 'UNITED STATES' > and (d_year = 1997 or d_year = 1998) > and p_category = 'MFGR#14' > group by d_year, s_city, p_brand > order by d_year, s_city, p_brand; d_year s_city p_brand profit 1998 UNITED ST9 MFGR #1440 6665681 |
如果读者感兴趣的话,可以查询剩余的 SQL,当然也可以和 Spark SQL 进行比较。另外 Flink SQL 也支持 EXPLAIN,查询 SQL 的执行计划。
- 2.4 创建视图
同样,可以在 Flink SQL CLI 中创建和删除视图,如下:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
Flink SQL> create view p_lineorder2 as > select lo_orderkey, > lo_linenumber, > lo_custkey, > lo_partkey, > lo_suppkey, > lo_orderdate, > lo_orderpriotity, > lo_shippriotity, > lo_quantity, > lo_extendedprice, > lo_ordtotalprice, > lo_discount, > lo_revenue, > lo_supplycost, > lo_tax, > lo_commitdate, > lo_shipmode, > lo_extendedprice * lo_discount as v_revenue > from ssb.lineorder; [INFO] View has been created. |
这里笔者需要特别强调的是,目前 Flink 无法删除 Hive 中的视图:
[Bash shell] 纯文本查看 复制代码
1
2
3
|
Flink SQL> drop view p_lineorder; [ERROR] Could not execute SQL statement. Reason: The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed. |
- 2.5 分区操作
Hive 数据库中创建一张分区表:
1
2
3
4
5
|
CREATE TABLE IF NOT EXISTS flink_partition_test ( id int, name string ) PARTITIONED BY (day string, type string) stored as textfile; |
接着,通过 Flink SQL 插入和查询数据:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# 插入静态分区的数据 Flink SQL> INSERT INTO flink_partition_test PARTITION ( type = 'Flink' , `day`= '2020-02-01' ) SELECT 100001, 'Flink001' ; # 查询 Flink SQL> select * from flink_partition_test; id name day type 100001 Flink001 2020-02-01 Flink # 插入动态分区 Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark' , '2020-02-02' , 'SparkSQL' ; # 查询 Flink SQL> select * from flink_partition_test; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink # 动态和静态分区结合使用类似,不再演示 # 覆盖插入数据 Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION ( type = 'Flink' ) SELECT 100002, 'Spark' , '2020-02-08' , 'SparkSQL-2.4' ; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink |
字段 day 在 Flink 属于关键字,要特殊处理。
- 2.6 其他功能
- 2.6.1 函数
Flink SQL 支持内置的函数和自定义函数。对于内置的函数,可以执行 show functions 进行查看,这一块笔者以后会单独介绍如何创建自定义函数。
- 2.6.2 设置参数
Flink SQL 支持设置环境参数,可以使用 set 命令查看和设置参数:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
Flink SQL> set ; deployment.gateway-address= deployment.gateway-port=0 deployment.m=yarn-cluster deployment.response-timeout=5000 deployment.yjm=1024 deployment.yn=2 deployment.ys=5 deployment.ytm=2048 execution.current-catalog=staginghive execution.current-database=ssb execution.max-idle-state-retention=0 execution.max-parallelism=128 execution.max-table-result-rows=1000000 execution.min-idle-state-retention=0 execution.parallelism=1 execution.periodic-watermarks-interval=200 execution.planner=blink execution.restart-strategy. type =fallback execution.result-mode=table execution. time -characteristic=event- time execution. type =batch Flink SQL> set deployment.yjm = 2048; |
总结
在本文中,笔者通过 Flink SQL 比较详细地去操作 Hive 数据库,以及 Flink SQL 提供的一些功能。
当然,目前 Flink SQL 操作 Hive 数据库还是存在一些问题:
- 目前只支持 TextFile 存储格式,还无法指定其他存储格式 ,只支持 Hive 数据库中 TextFile 存储格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。虽然实现了 RCFile、ORC、Parquet、Sequence 等存储格式,但是无法自动识别 Hive 表的存储格式。如果要使用其他存储格式,需要修改源码,重新编译。不过社区已经对这些存储格式进行了测试,相信不久以后就可以在 Flink SQL 中使用。
- OpenCSVSerde 支持不完善:如果读者使用 TextFile 的 row format serde 为 org.apache.hadoop.hive.serde2.OpenCSVSerde 时,无法正确识别字段类型,会把 Hive 表的字段全部映射为 String 类型。
- 暂时不支持 Bucket 表
- 暂时不支持 ACID 表
- Flink SQL 优化方面功能较少
- 权限控制方面:这方面和 Spark SQL 类似,目前基于 HDFS ACL 控制,暂时还没有实现 Sentry 或 Ranger 控制权限,不过目前 Cloudera 正在开发基于 Ranger 设置 Spark SQL 和 Hive 共享访问权限的策略,实现行/列级控制以及审计信息。
Flink 社区发展很快,所有这些问题只是暂时的,随着新版本的发布会被逐个解决。
如果 Flink SQL 目前不满足的需求,建议使用 API 方式来解决问题。