从Java中的spark转换函数写入HDFS中的文件
问题描述:
我的问题类似于scala中已经回答的问题,也用于读取文件。从Java中的spark转换函数写入HDFS中的文件
Reading files dynamically from HDFS from within spark transformation functions
我知道使用它们将结果返回给司机和司机写入HDFS蓄电池。在我们的用例中,每个执行器的输出都很大,所以我正在寻找一种在Java转换中写入HDFS的方法。
谢谢!
答
终于找到了一个优雅的方式来实现这一点。 Hadoop的配置创建一个广播可变
Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration();
Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration));
通过这个广播变量的变换或行动,并使用下面的代码片段获得的Hadoop文件系统:
FileSystem fileSystem = FileSystem.get(bc.getValue().value());
希望这帮助,如果别人是在同一船。
干杯!
答
JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName));
JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>() {
public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context());
stringJavaPairRDD.saveAsTextFile("hdfs://");
return stringJavaPairRDD;
}
});
感谢张的回复,感谢您发布一个方法来做到这一点。但在我的情况下,中间数据不是RDD,也不是流数据。 –
我终于从你的答案中选择了一些想法,并能够得到解决方案。作为另一个答案发布......谢谢! –