Flink流媒体 - 在Windows应用功能

问题描述:

我是flink和流媒体的新手。我想将每个分区的某个函数应用到流的每个窗口(使用事件时间)。什么迄今为止我所做的是这样的:Flink流媒体 - 在Windows应用功能

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

val inputStream = env.readTextFile("dataset.txt") 
     .map(transformStream(_)) 
     .assignAscendingTimestamps(_.eventTime) 
     .keyBy(_.id) 
     .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep)) 

def transformStream(input: String): EventStream = {...} 

case class EventStream(val eventTime: Long, val id: String, actualEvent: String) 

我想要做的是一般的功能应用到每个窗口批次中的每个分区,也许应用复杂的处理算法或类似的东西。我已经看到该方法适用于DataStream API,但我不明白它是如何工作的。在弗林克API它说,它是用来一样,在斯卡拉:

inputStream.apply { WindowFunction } 

谁能解释这是什么或如何使用它的应用方法是什么?斯卡拉的一个例子是可取的。应用方法做我想要的吗?

所以基本上有两种可能的方向来根据您想要做的计算类型。可以使用:fold/reduce/aggregate或更通用的,您已经提到 - apply。他们都适用于窗口的一个关键。

至于apply这是应用计算的非常通用的方法。最基本的版本(在斯卡拉)是:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R] 

其中函数有4个参数:

  • 窗口键(记住您正在使用的keyedStream)
  • 窗口(你可以提取即摹开始或从窗口的结束)
  • 已分配给这个特殊的窗口中的元素和关键
  • 一个收藏家,你应该发出的处理结果

必须记住,虽然这版本必须保持每个元素都处于状态,直到窗口发出。一个更好的内存性能解决方案将使用带有preAgreggator的版本,该版本在启动上述函数之前执行一些计算。

在这里,您可以看到与预先聚合一小片段:

val stream: DataStream[(String,Int)] = ... 

stream.keyBy(_._1) 
     .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap()))) 
     .apply((e1, e2) => (e1._1, e1._2 + e2._2), 
      (key, window, in, out: Collector[(String, Long, Long, Int)]) => { 
       out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum)) 
     }) 

,其对在会话窗口的一个关键的appearences。

所以基本上如果你不需要窗口的元信息,我会坚持如果他们足够的话,我会坚持到fold \ reduce \ aggregate。比考虑适用某种预先集合,如果这还不够,请查看最通用的apply

如需更完整的示例,您可以查看here

就我而言,您可以将map/flatmap/keyBy函数调用应用于有状态窗口数据val inputStream以更改数据。所以,如果你要创建

class DoSthWithYourStream {...}

,你需要定义你的方法和输入数据的限制,那么您可以创建另一个值:

val inputStreamChanged = inputStream .map(a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a) .flatMap(new DoSthWithYourStream())

Examples extending Java Classed and applying Scala classes into the stream using map/flapmap/key etc

如果你想使用CEP,那么我认为最好的选择是利用CEP pattern API

val pattern = Pattern.begin("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(inputStream, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))

+0

问题是我想处理整个分区,map/flatMap函数调用在DataStream的每个元素上应用转换。 –

事实证明,它需要一点魔法斯卡拉。什么到目前为止,我这样做是:

val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_)) 

    def processPartition(key: String, window: TimeWindow, 
         batch: Iterable[EventStream], 
         out: Collector[Long]): Unit = {..} 

从我的实验processPartition方法对整批即“键分区”(批次将只包含具有相同键的元素)应用的功能。我从Java API中获取了此方法的参数。如果有人能够详细阐述应用函数和它的工作原理,这将是有用的。