Spark(2.2):deserialise使用结构化流式处理来自卡夫卡的节俭记录
问题描述:
我是新来的火花。我使用结构化流式传输从kafka读取数据。Spark(2.2):deserialise使用结构化流式处理来自卡夫卡的节俭记录
我可以在Scala中使用此代码读取数据:
val data = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("startingOffsets", startingOffsets)
.load()
我在值列数据是节俭的记录。 Streaming API以二进制格式提供数据。我看到将数据转换为字符串或json的示例,但我无法找到任何有关如何将数据反序列化到Thrift的示例。
我该如何做到这一点?
答
我在databricks的网站上发现了这个博客。它展示了如何利用Spark SQL的API来消费和转换来自Apache Kafka的复杂数据流。
有解释UDF如何被用来解串器行的部分:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
我用java,所以只好写下面的示例UDF,以检查它怎么能在Java中被称为:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
现在我可以使用这个UDF在结构化流字数举例如下:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select(callUDF("mode", col("value")))
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
对我来说,下一步是为节俭反序列化写一个UDF。我会尽快发布。