如何将流数据写入S3?

如何将流数据写入S3?

问题描述:

我想使用Scala在Spark Streaming中将RDD[String]写入Amazon S3。这些基本上是JSON字符串。不知道如何更有效地做到这一点。 我找到了this post,其中使用了库spark-s3。这个想法是创建SparkContext然后SQLContext。在此之后,帖子的作者确实是这样的:如何将流数据写入S3?

myDstream.foreachRDD { rdd => 
     rdd.toDF().write 
       .format("com.knoldus.spark.s3") 
       .option("accessKey","s3_access_key") 
       .option("secretKey","s3_secret_key") 
       .option("bucket","bucket_name") 
       .option("fileType","json") 
       .save("sample.json") 
} 

什么是除了spark-s3其他的选择吗?是否可以将S3上的文件追加到流数据中?

你应该在Spark Documentation看看成dataframewriter模式的方法:

public DataFrameWriter mode(SaveMode saveMode)

指定当数据或表已经存在的行为。选项 包括: - SaveMode.Overwrite:覆盖现有数据。 - SaveMode.Append:追加数据。 - SaveMode.Ignore:忽略 操作(即no-op)。 - SaveMode.ErrorIfExists:默认选项, 在运行时抛出异常。

你可以试试像这样的事情追加 savemode。

rdd.toDF.write 
     .format("json") 
     .mode(SaveMode.Append) 
     .saveAsTextFile("s3://iiiii/ttttt.json"); 

Spark Append:

追加模式意味着节约了数据帧到数据源的情况下,如果数据 /表已经存在,则数据帧的内容被预期 被附加到现有的数据。

基本上可以通过使“格式”关键字方法

public DataFrameWriter format(java.lang.String source)

指定基础输出数据源选择要作为输出格式的格式。内置选项包括“parquet”,“json”等。

如为parquet

df.write().format("parquet").save("yourfile.parquet")

json

df.write().format("json").save("yourfile.json")


编辑:约S3凭据添加细节:

有两种不同的选择如何设置凭证,我们可以在SparkHadoopUtil.scala 与环境变量System.getenv("AWS_ACCESS_KEY_ID")spark.hadoop.foo属性看到:

SparkHadoopUtil.scala: 
if (key.startsWith("spark.hadoop.")) { 
      hadoopConf.set(key.substring("spark.hadoop.".length), value) 
} 

所以,你需要得到hadoopConfigurationjavaSparkContext.hadoopConfiguration()scalaSparkContext.hadoopConfiguration并设置

hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey) 
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey) 
+0

我是否正确理解清洁选项是使用Spark的'saveAsTextFile',而不是使用'spark-s3'? – Lobsterrrr

+0

在您的第一个示例中,我应该在哪里放置Amazon访问密钥并通过? – Lobsterrrr

+0

1,@Lobsterrrr我认为'saveAsTextFile'是由spark api提供的。 2,并且@jbird注意到它实际上不是逐字追加的。添加大数据对我来说没有什么意义 - 更好的方法是创建分区,例如 – VladoDemcak

S3上的文件cannot be appended。 S3中的“追加”意味着用包含附加数据的新对象替换现有对象。

+0

这是一个好点:让我们假设文件已经存在于S3中。 Spark表示数据将被追加。这是否意味着spark会运行几个进程:类似于create_new_data_file - > get_existing_file - > merge_files - > replace_file_on_S3?它对我来说看起来不像原子操作。如果smth出问题了 - 会发生什么? –