将流数据集附加到Spark中的批处理数据集中

问题描述:

我们在Spark中有一个用例,我们想从我们的数据库中将历史数据加载到Spark并不断向Spark添加新的流数据,然后我们可以对整个最新数据集进行分析,最新的数据集。将流数据集附加到Spark中的批处理数据集中

据我所知,Spark SQL和Spark Streaming都不能将历史数据与流数据结合起来。然后我发现了Spark 2.0中的结构化流式处理,似乎是为了解决这个问题。但经过一些实验后,我仍然无法弄清楚。这里是我的代码:

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 

    // Load historical data from MongoDB 
    JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc); 


    // Create typed dataset with customized schema 
    JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...}); 
    Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class); 
    Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class)); 


    // ds listens to a streaming data source 
    Dataset<Row> ds = spark.readStream() 
      .format("socket") 
      .option("host", "127.0.0.1") 
      .option("port", 11111) 
      .load(); 

    // Create the typed dataset with customized schema 
    Dataset<JavaRecordForSingleTick> ds1 = ds 
      .as(Encoders.STRING()) 
      .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() { 
     @Override 
     public Iterator<JavaRecordForSingleTick> call(String str) throws Exception { 
     ... 
     } 
    }, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class)); 


    // ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data 

    ds1 = ds1.union(df1); 
    StreamingQuery query = ds1.writeStream().format("console").start(); 
    query.awaitTermination(); 

我得到了一个错误“org.apache.spark.sql.AnalysisException:流媒体和批量DataFrames之间的联盟/不支持数据集;”当我union()两个数据集。

任何人都可以帮我吗?我会走错方向吗?

+0

在Spark 2.0中的结构化流是在Alpha中 - 很多东西还不支持。我想知道你是否不能使用有状态流。在有状态流媒体中,您可以使用历史数据引导您的状态,然后以您喜欢的方式添加流数据。有关详细信息,请参阅此[Databrick的博客帖子](https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html)。 –

+0

@GlennieHellesSindholt嗨Glennie,谢谢你的建议。我认为mapWithState()最适合用新流媒体数据替换/更新当前状态(键值对)。在我的使用案例中,我的RDD不是配对的关键值,也不需要更新旧数据。使用mapWithState()太多了吗? –

+0

我同意'mapWithState'不是明显的选择,如果你没有任何聚合,但如果你不需要历史数据,你为什么要在你的流? –

在支持这种类型的功能方面,我不能说MongoDB spark连接器,Google似乎没有多少关于它的信息。但是,Spark数据库生态系统中还有其他数据库。我涵盖了another answer中Spark数据库生态系统中的大部分内容。我不能确切地说哪个数据库容易地允许您查找的功能类型,但我知道SnappyDataMemSQL在该列表中。但是,您可能需要两种关系形式的数据。