如何在Scala中应用Flink的简单过滤器
问题描述:
我使用的是旧版本的Flink。我升级到1.2.0,我有一些过滤器问题。如何在Scala中应用Flink的简单过滤器
我有记录的数据流,其工作得很好:
val logs: DataStream[Log] = env.addSource(new LogSource(
data, delay, factor))
// DISPLAY TUPLE IN CONSOLE
logs.print()
// EXECUTE SCRIPT
env.execute("stream")
我当然读这说明文档:
dataStream.filter { _ != 0 }
我试着像这样的一堆东西:
val cleanLogs = logs.filter { _.isComplete }
但我得到了以下错误:
类型不匹配,预计:的filterFunction [日志],实际:(任意)=>的
所以我没有看到文件,但此错误的链接。 有什么帮助吗?例子 ?
感谢
答
的问题是第一的StreamExecutionEnvironment
这导致了这个问题,像filter
基本功能,一个错误的进口。
然后,当我使用老版本的Flink时,我使用的是Flink 1.X中不再提供的类别LocalExecutionEnvironment
。
相反:StreamExecutionEnvironment.createLocalEnvironment(1)
什么是'isComplete'方法的签名? –
这不是一种方法,Log的第一个属性是一个布尔值:isComplete。它与Flink 0.10一起工作得很好,但它可能不再可能了? – ImbaBalboa
我无法真正重现您的问题。我唯一想到的是一些错误的进口产品。确保你正在导入'DataStream'和'StreamExecutionEnvironment'的scala版本。在scala中最好总是导入'org.apache.flink.streaming.api.scala._' –