六、Flink源码分析系列之--Window理解
文章目录
1.Flink Window 概述
window是flink处理无穷流的核心,window将流拆分成有限大小的“桶”,在其上做运算。
1.1Window Api使用
首先我们看下windown的api如何使用,摘用官网的说明:
- Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows (是否keyby)
.window(...) <- required: "assigner" (window assihner是必须的)
[.trigger(...)] <- optional: "trigger" (else default trigger) (可选,不设置的话会指定一个默认的)
[.evictor(...)] <- optional: "evictor" (else no evictor) (可选,不设置的话就没有)
[.allowedLateness(...)] <- optional: "lateness" (else zero) (可选,不设置为0)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) (可选,不设置的话对于延迟数据没有side oupt)
.reduce/aggregate/fold/apply() <- required: "function" (必须,具体funtion操作)
[.getSideOutput(...)] <- optional: "output tag" (可选,side output 延迟数据处理)
- Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
1.2 window 生命周期
当属于窗口的第一个元素到达,该窗口就会被创建,当时间(时间时间eventtime或处理时间process time)超过了结束时间+用户设置的allowedLeteness时间时,窗口将被删除。flink保证只删除基于时间类型的窗口,不会删除类似global类型的窗口。举个例子:基于eventtime创建一个间隔5分钟,允许1分钟延迟的非重叠窗口,当12:00分到12:05分的第一个元素到达时窗口被创建,当watermarks过了12:06窗口被删除。
1.3 有key窗口和无key窗口
在定义window之前必须指定你的流是否需要key操作。
- 使用key,可以将event的任何字段作为key,使用了keyby后你的窗口计算会由多个task执行,具有相同类型的key会进入同一个task执行。
- 不使用key,表示你的数据流不会被分成多条流执行,所以的窗口都会在同一个task中计算,也就是并行度parallesim 为1。
2.Window内部实现
2.1 window Assigner
当指定了是否使用key后,接下来需要定义window assigner, window assigner定义了如何分配元素给window, 使用key的流用window(),不使用key的流用windowAll()方法。
WindowAssigner负责将元素分配给一个或多个窗口,flink中包含几种预定义窗口分配器,比如tumbling window, sliding window ,session window, global window(出了global window其他均基于时间)。当然还可以通过继承WindowAssinger实现自定义窗口分配器。
- Tumbling Window 翻滚窗口
翻滚窗口的特点是:固定大小,元素不会重复,可以用以下代码指定
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
可以指定偏移量,距离UTC-0的时间偏移
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
- Sliding window 滑动窗口
滑动窗口的特点是:固定大小,元素可以属于多个窗口。例如,设置10分钟大小的窗口,5分钟滑动一次,意味着每隔5分钟会得到一个窗口,包含了过去10分钟的所有event.可以用以下代码指定:
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- Session Window
session window的特点:session window 元素不会重复,没有固定的开始/结束时间。当一段时间内没有接受到元素时,窗口会关闭。可以定义一个session gap,当到达session gap期限后当前session会关闭,后续元素分配给新的session
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
或
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
- global window
global window 会将相同类型key分配给同一个global window. global window仅仅在自定义触发器时才有用,否则不执行任何计算。因为global window没有结束。
2.2 Window Function
window function 用户窗口的计算
name | 作用描述 |
---|---|
ReduceFunction | 输入两个参数,返回一个同类型结果 |
AggregateFunction | 聚合计算 |
FoldFunction | 给定一个初始值,所有元素依次增量掉用 |
ProcessWindowFunction |
2.3 Trigger
Trigger决定了一个窗口何时触发计算或清楚,如果你不指定自定义的Trigger,那么每一个window都会分配一个默认的Trigger。
Trigger接口有以下几个方法:
- onElement() 经过窗口的每个element都会调用
- onEventTime() eventTime的计时器触发时调用
- onProcessingTime() processTime的计时器触发时调用
- onMerge()
- clear() 方法执行删除操作时调用
Trigger的返回结果有以下几个:
- continue 不做任何操作
- fire 触发计算
- purge 清除窗口和窗口中的数据
- fire+purge 触发计算并清理窗口数据
注意:
- purge只是移除了窗口内容,触发计算的状态和元数据信息还是会保留。
- GlobalWindow 默认的触发器为NeverTrigger,永远不触发,所以要使用globalwindow时必须自定义触发器
Flink内置触发器有以下几个:
- EventTimeTrigger 事件时间触发
- ProcessTimeTrigger 处理事件触发
- CountTrigger 计数触发
- PurgingTrigger
- DeltaTrigger
2.4 Evictor
evictor可以理解为“驱逐器”,在Trigger触发之后,在窗口计算之前,用于剔除窗口中不要的元素,类似一个filter。
EvicTor接口包含了2个方法:
- ecivtBefore 窗口函数执行前的逻辑
- ecivAfter 窗口函数执行后的逻辑
Flink提供了三种内置的evictor实现: - TimeEvicTor 剔除元素中时间戳<(最大时间戳-interval)的值
- CountEvicTor 剔除
- DeltaEvicTor 计算窗口中最后一个元素和其余元素的差值,剔除大于或等于delta值的数据。
注意:
- evictor会阻止预聚合操作,所以在计算之前必须将所以元素发送给evictor.
2.5 Allowed lateness
有一种特殊的处理方法,就是当窗口结束时间已过,但是还是有数据发送过来,如何处理这种延迟数据呢?Flink提供了allowedLateness允许最大延迟。默认延迟值是0,当延迟时间也到时,watermarks标记不会再有比watermark还小的数据发送来了,这时会触发计算。
注意:GlobalWindow没有数据被认为是延迟的,因为global窗口的最后时间戳为Long.MAX_VALUE。
2.6 side ouput
可以用Flink提供的side output方法计算延迟或者丢弃的数据,例子如下:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
2.7 连续窗口操作
举个例子:第一个流计算总数,第二个流计算topN
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());