如何应用自定义数据格式/映射到每个事件加载整个数据集之前?

问题描述:

documentation的标准方法来读取数据流进入Apache的Spark是:如何应用自定义数据格式/映射到每个事件加载整个数据集之前?

events = spark.readStream \ 
    .format("json") \   # or parquet, kafka, orc... 
    .option() \     # format specific options 
    .schema(my_schema) \  # required 
    .load("path/to/data") 

但我需要清理一些数据的重新安排一些领域之前,我申请的模式,我希望会有一个

events = spark.readStream \ 
    .format("json") \   # or parquet, kafka, orc... 
    .option() \     # format specific options 
    .schema(my_schema) \  # required 
    **.map(custom_function)** # apply a custom function to the json object 
    .load("path/to/data") 

是否有一个有效的方式使用结构化数据流技术,为此在Apache的火花?

tl; dr简而言之,在加载数据集之前无法做到这一点。

我想到的唯一方法是将数据集加载为一组字符串,并使用一系列withColumnselect变换进行清理,这些变换实际上是您的.map(custom_function)

同意Jacek的回答。更具体地讲,你有两个选择:

  1. 应用输入数据的“超级模式”,然后操纵到您想要的模式。当(a)所有数据都是有效的JSON和(b)“超级模式”有点稳定时,这是最好的方法,例如,动态字段名称不存在。

  2. 作为文本阅读,使用json4s(或您选择的其他库)解析,根据需要进行操作。如果(a)任何输入行可能不是有效的JSON或(b)没有稳定的“超模式”,这是最好的方法。