如何应用自定义数据格式/映射到每个事件加载整个数据集之前?
问题描述:
从documentation的标准方法来读取数据流进入Apache的Spark是:如何应用自定义数据格式/映射到每个事件加载整个数据集之前?
events = spark.readStream \
.format("json") \ # or parquet, kafka, orc...
.option() \ # format specific options
.schema(my_schema) \ # required
.load("path/to/data")
但我需要清理一些数据的重新安排一些领域之前,我申请的模式,我希望会有一个
events = spark.readStream \
.format("json") \ # or parquet, kafka, orc...
.option() \ # format specific options
.schema(my_schema) \ # required
**.map(custom_function)** # apply a custom function to the json object
.load("path/to/data")
是否有一个有效的方式使用结构化数据流技术,为此在Apache的火花?
答
tl; dr简而言之,在加载数据集之前无法做到这一点。
我想到的唯一方法是将数据集加载为一组字符串,并使用一系列withColumn
或select
变换进行清理,这些变换实际上是您的.map(custom_function)
。
答
同意Jacek的回答。更具体地讲,你有两个选择:
应用输入数据的“超级模式”,然后操纵到您想要的模式。当(a)所有数据都是有效的JSON和(b)“超级模式”有点稳定时,这是最好的方法,例如,动态字段名称不存在。
作为文本阅读,使用
json4s
(或您选择的其他库)解析,根据需要进行操作。如果(a)任何输入行可能不是有效的JSON或(b)没有稳定的“超模式”,这是最好的方法。