如何将流数据写入S3?
我想使用Scala在Spark Streaming中将RDD[String]
写入Amazon S3。这些基本上是JSON字符串。不知道如何更有效地做到这一点。 我找到了this post,其中使用了库spark-s3
。这个想法是创建SparkContext
然后SQLContext
。在此之后,帖子的作者确实是这样的:如何将流数据写入S3?
myDstream.foreachRDD { rdd =>
rdd.toDF().write
.format("com.knoldus.spark.s3")
.option("accessKey","s3_access_key")
.option("secretKey","s3_secret_key")
.option("bucket","bucket_name")
.option("fileType","json")
.save("sample.json")
}
什么是除了spark-s3
其他的选择吗?是否可以将S3上的文件追加到流数据中??
你应该在Spark Documentation看看成dataframewriter模式的方法:
public DataFrameWriter mode(SaveMode saveMode)
指定当数据或表已经存在的行为。选项 包括: - SaveMode.Overwrite:覆盖现有数据。 - SaveMode.Append:追加数据。 - SaveMode.Ignore:忽略 操作(即no-op)。 - SaveMode.ErrorIfExists:默认选项, 在运行时抛出异常。
你可以试试像这样的事情追加 savemode。
rdd.toDF.write
.format("json")
.mode(SaveMode.Append)
.saveAsTextFile("s3://iiiii/ttttt.json");
追加模式意味着节约了数据帧到数据源的情况下,如果数据 /表已经存在,则数据帧的内容被预期 被附加到现有的数据。
基本上可以通过使“格式”关键字方法
public DataFrameWriter format(java.lang.String source)
指定基础输出数据源选择要作为输出格式的格式。内置选项包括“parquet”,“json”等。
如为parquet
:
df.write().format("parquet").save("yourfile.parquet")
或json
:
df.write().format("json").save("yourfile.json")
编辑:约S3凭据添加细节:
有两种不同的选择如何设置凭证,我们可以在SparkHadoopUtil.scala
与环境变量System.getenv("AWS_ACCESS_KEY_ID")
或spark.hadoop.foo
属性看到:
SparkHadoopUtil.scala:
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
所以,你需要得到hadoopConfiguration
在javaSparkContext.hadoopConfiguration()或scalaSparkContext.hadoopConfiguration并设置
hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey)
S3上的文件cannot be appended。 S3中的“追加”意味着用包含附加数据的新对象替换现有对象。
这是一个好点:让我们假设文件已经存在于S3中。 Spark表示数据将被追加。这是否意味着spark会运行几个进程:类似于create_new_data_file - > get_existing_file - > merge_files - > replace_file_on_S3?它对我来说看起来不像原子操作。如果smth出问题了 - 会发生什么? –
我是否正确理解清洁选项是使用Spark的'saveAsTextFile',而不是使用'spark-s3'? – Lobsterrrr
在您的第一个示例中,我应该在哪里放置Amazon访问密钥并通过? – Lobsterrrr
1,@Lobsterrrr我认为'saveAsTextFile'是由spark api提供的。 2,并且@jbird注意到它实际上不是逐字追加的。添加大数据对我来说没有什么意义 - 更好的方法是创建分区,例如 – VladoDemcak