Spark - 使用Firehose从分区文件夹读取JSON

问题描述:

Kinesis firehose将文件(本例中为时间序列JSON)的持久性管理为由YYYY/MM/DD/HH分区的文件夹层次结构(下至24位编号的小时)...大。Spark - 使用Firehose从分区文件夹读取JSON

如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹并从所有的叶子json文件创建一个静态的Dataframe?数据帧阅读器是否有'选项'?

我的下一个目标是成为一个流式DF,其中Firehose将新文件保存到S3中自然成为使用Spark 2.0中新的结构化流式传输的数据帧的一部分。我知道这都是实验性的 - 希望有人在之前使用S3作为流文件源,数据按上述方式分配到文件夹中。当然会更喜欢直接使用Kinesis流,但在2.0上没有此连接器的日期,因此Firehose-> S3是临时的。 ND:我正在使用将S3挂载到DBFS中的数据块,但当然可以是EMR或其他Spark提供程序。如果可以共享一个例子,也很高兴看到一个笔记本。

干杯!

我可以读取嵌套的子文件夹并从所有叶JSON文件创建一个静态DataFrame吗? DataFrame阅读器是否有选项?

是的,正如你的目录结构是有规律的(YYYY/MM/DD/HH),你可以给直到使用通配符字符的叶节点的路径类似下面

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate 

val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json") 
/* Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json */ 

当然,宁愿直接的Kinesis流,但没有2.0的连接器上的日期,所以Firehose-> S3是临时的。

我可以看到有一个库为Kinesis integration with Spark Streaming。因此,您可以直接读取流数据并在不读取S3的情况下对其执行SQL操作。

groupId = org.apache.spark 
artifactId = spark-streaming-kinesis-asl_2.11 
version = 2.0.0 

示例代码星火流和SQL

import org.apache.spark.streaming.Duration 
import org.apache.spark.streaming.kinesis._ 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 

val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], 
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) 

kinesisStream.foreachRDD { rdd => 

    // Get the singleton instance of SparkSession 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() 
    import spark.implicits._ 

    // Convert RDD[String] to DataFrame 
    val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here") 

    // Create a temporary view with DataFrame 
    jsonDf.createOrReplaceTempView("json_data_tbl") 

    //As we have DataFrame and SparkSession object we can perform most 
    //of the Spark SQL stuff here 
} 
+1

嵌套的子文件夹在这里的答案会非常慢。它读取的方式是递归地在每个子文件夹上执行一个列表,这与性能差不多。 –

+0

是的,那就是如果你为静态s3读取。你可以试试第二种方法(直接处理'Kinesis'流)? – mrsrinivas

+1

我不是OP,我还没有流媒体需求,我只是希望在Stack Overflow中更清楚地看到这一点。我正在使用您记录的静态方法,但将其更改为使用更好的S3调用进行扫描,然后创建要并行处理的文件列表(最佳)或要传入的文件序列。 –

全面披露:我为Databricks工作,但我并不代表他们对堆栈溢出。

如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹并从所有叶子json文件创建一个静态数据框?数据帧阅读器是否有'选项'?

DataFrameReader支持加载序列。请参阅def json(paths: String*): DataFrame的文档。您可以指定序列,使用通配图案或编程构建它(推荐):

val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03") 
val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*" 
val basePath = "/mnt/myles/structured-streaming/2016/12/18/0" 
val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json") 

我知道这是所有的实验 - 希望有人以前使用S3作为流文件源,其中数据是如上所述分割成文件夹。当然会更喜欢直接使用Kinesis流,但在2.0上没有此连接器的日期,因此Firehose-> S3是临时的。

由于您使用DBFS,我要去承担S3桶,其中数据从流水流已经安装到DBFS。查看Databricks文档,如果您需要帮助mounting your S3 bucket to DBFS。一旦你上述的输入路径,你可以简单地将文件加载到一个静态或流数据帧:

静态

val staticInputDF = 
    spark 
    .read 
    .schema(jsonSchema) 
    .json(inputPathSeq : _*) 

staticInputDF.isStreaming 
res: Boolean = false 

val streamingInputDF = 
    spark 
    .readStream      // `readStream` instead of `read` for creating streaming DataFrame 
    .schema(jsonSchema)    // Set the schema of the JSON data 
    .option("maxFilesPerTrigger", 1) // Treat a sequence of files as a stream by picking one file at a time 
    .json(inputPathSeq : _*) 

streamingCountsDF.isStreaming 
res: Boolean = true 

这其中大部分是直接从Databricks documentation on Structured Streaming。甚至还有一个可直接导入到Databricks中的笔记本示例。

+0

如果使用通过DBFS安装的S3,嵌套全局比使用's3a'文件系统更快速吗? –

+0

我猜想使用glob不一定会保持顺序,但是使用文件序列(对于流式计算,这似乎很重要),是吗? –

+0

最后,流不会继续超过序列文件的原始列表,因此当它们到达时,它会拾取新文件,并继续流式传输在这里不起作用。 –