如何在Scala中应用Flink的简单过滤器

问题描述:

我使用的是旧版本的Flink。我升级到1.2.0,我有一些过滤器问题。如何在Scala中应用Flink的简单过滤器

我有记录的数据流,其工作得很好:

val logs: DataStream[Log] = env.addSource(new LogSource(
     data, delay, factor)) 

// DISPLAY TUPLE IN CONSOLE 
logs.print() 

// EXECUTE SCRIPT 
env.execute("stream") 

我当然读这说明文档:

dataStream.filter { _ != 0 } 

我试着像这样的一堆东西:

val cleanLogs = logs.filter { _.isComplete } 

但我得到了以下错误:

类型不匹配,预计:的filterFunction [日志],实际:(任意)=>的

所以我没有看到文件,但此错误的链接。 有什么帮助吗?例子 ?

感谢

+0

什么是'isComplete'方法的签名? –

+0

这不是一种方法,Log的第一个属性是一个布尔值:isComplete。它与Flink 0.10一起工作得很好,但它可能不再可能了? – ImbaBalboa

+1

我无法真正重现您的问题。我唯一想到的是一些错误的进口产品。确保你正在导入'DataStream'和'StreamExecutionEnvironment'的scala版本。在scala中最好总是导入'org.apache.flink.streaming.api.scala._' –

的问题是第一的StreamExecutionEnvironment这导致了这个问题,像filter基本功能,一个错误的进口。

然后,当我使用老版本的Flink时,我使用的是Flink 1.X中不再提供的类别LocalExecutionEnvironment

相反:StreamExecutionEnvironment.createLocalEnvironment(1)