在Clojure中写Spark结构化流示例时出现错误
问题描述:
我想重写Clojure中的Spark结构化流示例。在Clojure中写Spark结构化流示例时出现错误
的例子是用Scala编写如下:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
(ns flambo-example.streaming-example
(:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
[org.apache.spark.sql.functions]
))
(def spark
(->
(SparkSession/builder)
(.appName "sample")
(.master "local[*]")
.getOrCreate)
)
(def lines
(-> spark
.readStream
(.format "socket")
(.option "host" "localhost")
(.option "port" 9999)
.load
)
)
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap #(clojure.string/split % #" "))
))
上述代码导致以下例外。
;;由java.lang.IllegalArgumentException引发 ;;找不到匹配的方法:flatMap for class ;; org.apache.spark.sql.Dataset
我该如何避免错误?
答
您必须按照签名。 Java的Dataset
API提供的Dataset.flatMap
两种实现方式,一是这需要scala.Function1
def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
,第二个这需要星火自己o.a.s.api.java.function.FlatMapFunction
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U]
前者是没有用处的,但是你应该能够使用后者。 RDD
API flambo
uses macros to create Spark friendly adapters可以通过flambo.api/fn
进行访问 - 我不确定这些是否可以直接与Datasets
一起使用,但如果需要,您应该可以调整它们。
由于您不能依赖隐式Encoders
您还必须提供与返回类型相匹配的显式编码器。
总体而言,你需要的东西左右:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap f e)
))
其中f
实现FlatMapFunction
和e
是Encoder
。一个示例实现:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap
(proxy [FlatMapFunction] []
(call [s] (.iterator (clojure.string/split s #" "))))
(Encoders/STRING))))
,但我想这是有可能找到一个更好的。
在实践中,我会避免输入Dataset
,并专注于DataFrame
(Dataset[Row]
)。