六、Flink源码分析系列之--Window理解

1.Flink Window 概述

window是flink处理无穷流的核心,window将流拆分成有限大小的“桶”,在其上做运算。

1.1Window Api使用

首先我们看下windown的api如何使用,摘用官网的说明:

  • Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows (是否keyby)
       .window(...)              <-  required: "assigner" (window assihner是必须的)
      [.trigger(...)]            <-  optional: "trigger" (else default trigger) (可选,不设置的话会指定一个默认的)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor) (可选,不设置的话就没有)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero) (可选,不设置为0)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data) (可选,不设置的话对于延迟数据没有side oupt)
       .reduce/aggregate/fold/apply()      <-  required: "function" (必须,具体funtion操作)
      [.getSideOutput(...)]      <-  optional: "output tag" (可选,side output 延迟数据处理)
  • Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
1.2 window 生命周期

当属于窗口的第一个元素到达,该窗口就会被创建,当时间(时间时间eventtime或处理时间process time)超过了结束时间+用户设置的allowedLeteness时间时,窗口将被删除。flink保证只删除基于时间类型的窗口,不会删除类似global类型的窗口。举个例子:基于eventtime创建一个间隔5分钟,允许1分钟延迟的非重叠窗口,当12:00分到12:05分的第一个元素到达时窗口被创建,当watermarks过了12:06窗口被删除。

1.3 有key窗口和无key窗口

在定义window之前必须指定你的流是否需要key操作。

  • 使用key,可以将event的任何字段作为key,使用了keyby后你的窗口计算会由多个task执行,具有相同类型的key会进入同一个task执行。
  • 不使用key,表示你的数据流不会被分成多条流执行,所以的窗口都会在同一个task中计算,也就是并行度parallesim 为1。

2.Window内部实现

2.1 window Assigner

当指定了是否使用key后,接下来需要定义window assigner, window assigner定义了如何分配元素给window, 使用key的流用window(),不使用key的流用windowAll()方法。
WindowAssigner负责将元素分配给一个或多个窗口,flink中包含几种预定义窗口分配器,比如tumbling window, sliding window ,session window, global window(出了global window其他均基于时间)。当然还可以通过继承WindowAssinger实现自定义窗口分配器。

  • Tumbling Window 翻滚窗口
    六、Flink源码分析系列之--Window理解
    翻滚窗口的特点是:固定大小,元素不会重复,可以用以下代码指定
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
可以指定偏移量,距离UTC-0的时间偏移
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
  • Sliding window 滑动窗口
    六、Flink源码分析系列之--Window理解
    滑动窗口的特点是:固定大小,元素可以属于多个窗口。例如,设置10分钟大小的窗口,5分钟滑动一次,意味着每隔5分钟会得到一个窗口,包含了过去10分钟的所有event.可以用以下代码指定:
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  • Session Window
    六、Flink源码分析系列之--Window理解
    session window的特点:session window 元素不会重复,没有固定的开始/结束时间。当一段时间内没有接受到元素时,窗口会关闭。可以定义一个session gap,当到达session gap期限后当前session会关闭,后续元素分配给新的session
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
或
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
}))
  • global window
    global window 会将相同类型key分配给同一个global window. global window仅仅在自定义触发器时才有用,否则不执行任何计算。因为global window没有结束。
2.2 Window Function

window function 用户窗口的计算

name 作用描述
ReduceFunction 输入两个参数,返回一个同类型结果
AggregateFunction 聚合计算
FoldFunction 给定一个初始值,所有元素依次增量掉用
ProcessWindowFunction
2.3 Trigger

Trigger决定了一个窗口何时触发计算或清楚,如果你不指定自定义的Trigger,那么每一个window都会分配一个默认的Trigger。

Trigger接口有以下几个方法:

  • onElement() 经过窗口的每个element都会调用
  • onEventTime() eventTime的计时器触发时调用
  • onProcessingTime() processTime的计时器触发时调用
  • onMerge()
  • clear() 方法执行删除操作时调用

Trigger的返回结果有以下几个:

  • continue 不做任何操作
  • fire 触发计算
  • purge 清除窗口和窗口中的数据
  • fire+purge 触发计算并清理窗口数据

注意:

  • purge只是移除了窗口内容,触发计算的状态和元数据信息还是会保留。
  • GlobalWindow 默认的触发器为NeverTrigger,永远不触发,所以要使用globalwindow时必须自定义触发器

Flink内置触发器有以下几个:

  • EventTimeTrigger 事件时间触发
  • ProcessTimeTrigger 处理事件触发
  • CountTrigger 计数触发
  • PurgingTrigger
  • DeltaTrigger
2.4 Evictor

evictor可以理解为“驱逐器”,在Trigger触发之后,在窗口计算之前,用于剔除窗口中不要的元素,类似一个filter。
EvicTor接口包含了2个方法:

  • ecivtBefore 窗口函数执行前的逻辑
  • ecivAfter 窗口函数执行后的逻辑
    Flink提供了三种内置的evictor实现:
  • TimeEvicTor 剔除元素中时间戳<(最大时间戳-interval)的值
  • CountEvicTor 剔除
  • DeltaEvicTor 计算窗口中最后一个元素和其余元素的差值,剔除大于或等于delta值的数据。

注意:

  • evictor会阻止预聚合操作,所以在计算之前必须将所以元素发送给evictor.
2.5 Allowed lateness

有一种特殊的处理方法,就是当窗口结束时间已过,但是还是有数据发送过来,如何处理这种延迟数据呢?Flink提供了allowedLateness允许最大延迟。默认延迟值是0,当延迟时间也到时,watermarks标记不会再有比watermark还小的数据发送来了,这时会触发计算。

注意:GlobalWindow没有数据被认为是延迟的,因为global窗口的最后时间戳为Long.MAX_VALUE。

2.6 side ouput

可以用Flink提供的side output方法计算延迟或者丢弃的数据,例子如下:

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
2.7 连续窗口操作

举个例子:第一个流计算总数,第二个流计算topN

DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());