Apache Flink:如何使用Flink DataSet API从一个数据集创建两个数据集
问题描述:
我正在使用Flink 0.10.1的DataSet API编写应用程序。 我可以使用Flink中的单个操作员获得多个收集器吗?Apache Flink:如何使用Flink DataSet API从一个数据集创建两个数据集
我想要做的是什么样的东西如下:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
目前我打电话mapPartition两次从一个源数据集使两个数据集。
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem2)
}
}
}
由于doParsing功能是相当昂贵的,我想每行只有一次调用它。
p.s.如果你能让我知道其他方法来以更简单的方式做这种事情,我将非常感激。
答
Flink不支持多个收集器。但是,可以通过增加一个额外的字段,其指示输出类型更改解析步骤的输出:
val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(0, elem1) // 0 indicates small
collector.collect(1, elem2) // 1 indicates large
}
}
}
接下来消耗输出intermediate
两次并过滤每一个用于所述第一属性。第一个过滤器为0
第二个过滤器为1
(您也添加了一个投影来摆脱第一个属性)。
+---> filter("0") --->
|
intermediate --+
|
+---> filter("1") --->