Flink DataSet Transformations【翻译】
在前面的文章中介绍了一下DataStream的一些Operator的操作,这篇文章主要讲述DataSet的一些Transformations操作。它主要有以下操作:
下面会逐一进行讲解
Map
Map操作为DataSet提供了一个用户自定义的map函数,它实现了一对一的映射,并且该函数只能返回一个元素(这里可以实现MapFunction
接口或者RichMapFunction
来定义用户自己的逻辑)。
例如下面的示例将一个元素为Tuple元组的DataSet转换成了一个元素为整数的DataSet:
// MapFunction that adds two integer values
public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> in) {
return in.f0 + in.f1;
}
}
// [...]
DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
DataSet<Integer> intSums = intPairs.map(new IntAdder());
FlatMap
FlatMap操作可以为DataSet的每一个元素定义一个用户自定义的FlatMao函数,它和map类似,不同的是它的返回结果可以是任意个(包括0个)。(可以实现FlatMapFunction
接口或者RichFlatMapFucntion
接口)。
例如下面的示例将DataSet中的每一行文本转换为单词:
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) {
for (String token : value.split("\\W")) {
out.collect(token);
}
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());
MapPartition
Mappartition操作可以使用单个函数去调用处理一个并行的分区,它可以自定义MapPartitionFunction获得每一个分区的Iterable
集合,然后通过处理输出一个任意个数的结果值,每一个分区操作的元素依赖于它的并行度和之前的操作。(该操作的效率高于map操作,通常在我们需要连接一些资源,比如数据库等操作,可以使用盖草走对于每一个分区打开一个连接,避免资源过度消耗)。
下面的例子计算DataSet中的每一个分区的文本的单词数:
public class PartitionCounter implements MapPartitionFunction<String, Long> {
public void mapPartition(Iterable<String> values, Collector<Long> out) {
long c = 0;
for (String s : values) {
c++;
}
out.collect(c);
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
Filter
Filter
Filter操作可以让用户使用自定义的FilterFunction函数去实现自定义逻辑,最后保留该函数中返回true的元素。
下面操作从一个DataSet中删除所有小于等于0的整数:
// FilterFunction that filters out all Integers smaller than zero.
public class NaturalNumberFilter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer number) {
return number >= 0;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
Projection of Tuple DataSet
该操作可以理解为投影操作,它会删除或者移动一个Tuple DataSet中某些字段,我们可以使用project(int...)
函数去保留对应tuple中的顺序的索引位置的字段。该操作不需要我们自定义函数就可以直接使用(一般我们可以使用该操作进行一些数据的清洗与转换)。
例如下面实例只保留了第一个和第三个字段,并且交换了他们的位置:
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
在有的时候,java编译器不能推断出project
操作的类型,这会造成一个问题:当你在project操作之后再调用其他操作,如下所示:
DataSet<Tuple5<String,String,String,String,String>> ds = ....
DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);
我们通过如下方式解决上面的问题:
DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);
Transformations on Grouped DataSet
在DataSet中,我们也可以进行分组操作,下面有多种方式可以指定key进行分组:
- key expressions
- a key-selector function
- one or more field position keys (Tuple DataSet only)
- Case Class fields (Case Classes only)
Reduce on Grouped DataSet:
该操作和DataStream中的Reduce一样,可以将两个集合中的元素按照用户自定义的逻辑进行合并,例如下面是一个简单wordcount示例:
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping on field "word"
.groupBy("word")
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
下面给出groupBy的相关配套操作,更加具体的使用情况可以参考官方文档,有详细的用法介绍【https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html】
Distinct
依据元素的所有字段或者字段的子集,删除DataSet中所有重复的数据,例如下面的例子可以删除重复的元素:
DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input.distinct();
也可以根据字段的索引来进行去重操作,如下所示:
DataSet<Tuple2<Integer, Double, String>> input = // [...]
DataSet<Tuple2<Integer, Double, String>> output = input.distinct(0,2);
或者可以实现KeySelector
接口实现自定义key,如下所示:
private static class AbsSelector implements KeySelector<Integer, Integer> {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(Integer t) {
return Math.abs(t);
}
}
DataSet<Integer> input = // [...]
DataSet<Integer> output = input.distinct(new AbsSelector());
也可以使用字段名来进行去重操作,如下所示:
// some ordinary POJO
public class CustomType {
public String aName;
public int aNumber;
// [...]
}
DataSet<CustomType> input = // [...]
DataSet<CustomType> output = input.distinct("aName", "aNumber");
Join
对于join操作,它和DataStream中的操作类似,它可以分为cross join,innner join,outer join等几大类,具体详情可以参考之前的一篇文章【Flink Operator之CoGroup、Join以及Connect】,也可以参考官方文档。
这里重点讲述一下join hit的几种策略,例如下面我们可以显示的指定哪个dataset是小的数据集,哪个是大的,这样flink就会进行一些优化操作:
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 =
// hint that the second DataSet is very small
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 =
// hint that the second DataSet is very large
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
几种策略如下:
对于outerjoin也有相应的策略,与上面类似,具体如下表所示:
Union
该操作主要用于将多个类型相同的DataSets合并成为一个DataSet。如下例所示:
DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
对于一些与分区相关的操作如下表所示:
First-n
无论是批处理还是流处理,该操作都非常有用,我们可以根据自己实现的一些逻辑获取前N个我们需要的元素,具体如下例所示:
DataSet<Tuple2<String, Integer>> in = // [...]
// Return the first five (arbitrary) elements of the DataSet
DataSet<Tuple2<String, Integer>> out1 = in.first(5);
// Return the first two (arbitrary) elements of each String group
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
.first(2);
// Return the first three elements of each String group ordered by the Integer field
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.first(3);
参考资料:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html