一起学习Spark(六)结构化流Structured Streaming编程指南(2)-窗口函数

本篇主要内容是Spark Structured Streaming实现事件时间的窗口操作。

滑动事件时间窗口的聚合操作对于Structured Streaming非常简单,与分组聚合非常相似。在分组聚合中,会按照用户的指定的一个或多个列进行分组,再为用户指定的分组列中的每个惟一值维护聚合值(例如计数),对于基于窗口的聚合,为每一个事件时间所在的窗口维护聚合值。让我们用一个例子来理解它。

比如现在我们需要每隔5分钟对最近10分钟的输入流数据进行word count,也就是在12:10时计算12:00-12:10的数据,在12:15计算12:05-12:15的数据。请注意,12:00 - 12:10是指在12:00之后但12:10之前到达的数据。12:07收到的一个词。这个单词应该对应于两个windows 12:00 - 12:10和12:05 - 12:15的计数。因此计数将由分组键(即单词)和窗口(可以从事件时间计算)同时索引。

结果表如下图所示:

 

一起学习Spark(六)结构化流Structured Streaming编程指南(2)-窗口函数

由于这个窗口类似于分组,因此在代码中,可以使用groupBy()和window()操作来表示窗口化的聚合。伪代码如下:

import spark.implicits._

val words = ... //流式DataFrame 数据格式为:{ timestamp: Timestamp, word: String }

// 根据窗口和单词对数据进行分组,并计算每个组的计数
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

 处理延迟数据和水印

但现在有一种特殊的情况,因为执行的窗口是基于事件时间的,如果有数据延迟到达系统会发生什么情况,比如一条数据是在12:04产生的,但可能因为网络等因素知道12:11才会被系统接收到。应用程序这时候需要更新12:00-12:10的旧计数结果,这在Structured Streaming中是内置的特性,自然会发生,Structured Streaming可以长期维护部分聚合的中间状态,以便后期数据可以正确更新旧windows的聚合,如下所示。

一起学习Spark(六)结构化流Structured Streaming编程指南(2)-窗口函数

但是对于一个需要运行数天的系统,因为内存的限制不可能无限制的存储所有的中间状态,这意味着系统需要知道何时可以将旧的聚合从内存中删除,也就意味着系统不再接收该聚合的延迟数据。为了确保这一点,在Spark2.1中引入了水印,它允许引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。可以通过指定事件时间的列和以事件时间表示的数据预期延迟的阈值来定义查询的水印。比如定义的水印时间阀值是10分钟,那么晚于12:20的延迟数据将不再更新12:00-12:10这个窗口的结果。

代码的实现非常简单,定义一下水印阀值就可以了:

import spark.implicits._

val words = ... // 流式DataFrame 数据格式: { timestamp: Timestamp, word: String }

// 根据窗口和单词对数据进行分组,并计算每个组的计数
val windowedCounts = words
//设置阀值
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

在这个例子中,我们在列“timestamp”的值上定义查询的水印,并定义“10分钟”作为数据允许延迟的阈值。如果该查询以更新输出模式(稍后将在输出模式一节中讨论)运行,引擎将继续更新结果表中窗口的计数,直到该窗口的时间比水印的时间长,水印比列“timestamp”中的当前事件时间慢10分钟。这里有一个例子。 

一起学习Spark(六)结构化流Structured Streaming编程指南(2)-窗口函数

如图所示,横坐标表示的是当前系统时间,纵坐标是提交数据的事件时间,蓝色虚线表示的是当前系统时间下跟踪的最大事件时间,橙色实线代表的是事件延迟达到的阀值,也就是说在蓝色虚线上以及介于蓝色虚线与橙色实线之间的数据可以被正常处理,而在橙色实线右边达到的数据将会被舍弃。

一些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了使用它们,我们还支持Append模式,在该模式中,只有最后的计数被写入接收器。如下图所示。

请注意,在非流数据集上使用withWatermark是无效。由于水印不应该以任何方式影响任何批处理查询,我们将直接忽略它。

一起学习Spark(六)结构化流Structured Streaming编程指南(2)-窗口函数

与前面的更新模式类似,引擎维护每个窗口的中间计数。但是,部分计数没有更新到结果表,也没有写入接收器。引擎等待“10分钟”来计算延迟日期,然后删除窗口< watermark的中间状态,并将最终的计数追加到结果表/sink。也就是说在12:10时将不会计算12:00-12:10的数据结果,而是会等到水印的阀值时间后也就是在12:20再计算并写入结果表。

水印清除聚集状态的条件:

需要注意的是,要清除聚合查询中的状态,水印必须满足以下条件(从Spark 2.1.1开始,将来可能会发生更改)。

1.输出模式必须是Append或Update。Complete模式要求保留所有聚合数据,因此不能使用水印来删除中间状态。有关每个输出模式的语义的详细说明,请参见上篇博客。

2.聚合必须具有事件时间列或事件时间列上的窗口。

3.必须在与聚合中使用的时间戳列相同的列上调用withWatermark。例如,df.withWatermark("time", "1 min").groupBy("time2").count()在Append输出模式下无效,因为在与聚合列不同的列上定义了水印。

4.withWaterMark必须在聚合操作前调用,例如,df.groupBy("time").count().withWatermark("time", "1 min") 是无效的Append输出模式。