Flink on Zeppelin-2
Flink Interpreter类型
首先介绍下Zeppelin中的Flink Interpreter类型。Zeppelin的Flink Interpreter支持Flink的所有API (DataSet, DataStream, Table API )。语言方面支持Scala,Python,SQL。下图是Zeppelin中支持的不同场景下的Flink Interpreter。
配置Flink Interpreter
下图例举了所有重要的Flink配置信息,除此之外你还可以配置任意Flink的Configuration(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)
内置入口变量
Flink Interpreter (%flink) 为用户自动创建了下面6个变量作为Flink Scala程序的入口。
-
senv
(StreamExecutionEnvironment), -
benv
(ExecutionEnvironment) -
stenv
(StreamTableEnvironment for blink planner) -
btenv
(BatchTableEnvironment for blink planner) -
stenv_2
(StreamTableEnvironment for flink planner) -
btenv_2
(BatchTableEnvironment for flink planner)
PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了6个python变量作为PyFlink程序的入口
-
s_env (StreamExecutionEnvironment),
-
b_env (ExecutionEnvironment)
-
st_env
(StreamTableEnvironment for blink planner) -
bt_env
(BatchTableEnvironment for blink planner) -
st_env_2
(StreamTableEnvironment for flink planner) -
bt_env_2
(BatchTableEnvironment for flink planner)
Blink/Flink Planner
Flink 1.10中有2种table api的planner:flink
& blink
.
-
如果你用DataSet api以及需要把DataSet转换成Table,那么就需要使用Flink planner的TableEnvironment (
btenv_2
andstenv_2
). -
其他场景下, 我们都会建议用户使用
blink
planner. 这也是Flink sql使用的planner(%flink.bsql
&%flink.ssql
)
使用Flink Batch SQL
%flink.bsql 是用来执行Flink的batch sql. 运行 help
命令可以得到所有可用的命令
总的来说,Flink Batch SQL可以用来做2大任务:
-
使用
insert into
语句来做 Batch ETL -
使用
select
语句来做BI 数据分析
基于Bank数据的Batch ETL
下面我们基于Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做Batch ETL任务。
-
首先用Flink Sql创建一个raw 数据的source table,以及清洗干净后的sink table。
-
然后再定义Table Function来parse raw data。
-
接下来就可以用insert into语句来进行数据转换(source table --> sink table)
-
用select语句来Preview最终数据,验证insert into语句的正确性
基于Bank数据的BI数据分析
经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的SQL Select语句进行分析,也可以使用Zeppelin的dynamic forms来增加交互性(TextBox,Select,Checkbox)
使用Flink UDF
SQL虽然强大,但表达能力毕竟有限。有时候就要借助于UDF来表达更复杂的逻辑。Flink Interpreter 支持2种UDF (Scala + Python)。下面是2个简单的例子。
Scala UDF
%flink
class ScalaUpper extends ScalarFunction {
def eval(str: String) = str.toUpperCase
}
btenv.registerFunction("scala_upper", new ScalaUpper())
Python UDF
%flink.pyflink
class PythonUpper(ScalarFunction):
def eval(self, s):
return s.upper()
bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
对Hive数据的数据分析
除了可以分析Flink SQL创建的table之外,Flink也可以分析Hive上已有的table。如果要让Flink Interpreter使用Hive,那么需要做以下配置
-
设置
zeppelin.flink.enableHive
为true
-
Copy 下面这些 dependencies 到flink的 lib 目录
-
flink-connector-hive_{scala_version}-{flink.version}.jar
-
flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
-
flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
-
hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
-
在Flink interpreter setting 里或者 zeppelin-env.sh里指定
HIVE_CONF_DIR
-
在Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的Hive版本
下面就用一个简单的例子展示如何在Zeppelin中用Flink查询Hive table
1. 用Zeppelin的jdbc interpreter查询hive tables
2. 用Flink sql 查询 hive table的schema
3. 用Flink Sql 查询hive table
本文只是简单介绍如何在Zeppelin中使用Flink SQL + UDF,关于更多Flink SQL和UDF请参考Flink官方文档