从Java中的spark转换函数写入HDFS中的文件

问题描述:

我的问题类似于scala中已经回答的问题,也用于读取文件。从Java中的spark转换函数写入HDFS中的文件

Reading files dynamically from HDFS from within spark transformation functions

我知道使用它们将结果返回给司机和司机写入HDFS蓄电池。在我们的用例中,每个执行器的输出都很大,所以我正在寻找一种在Java转换中写入HDFS的方法。

谢谢!

终于找到了一个优雅的方式来实现这一点。 Hadoop的配置创建一个广播可变

Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration(); 
Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration)); 

通过这个广播变量的变换或行动,并使用下面的代码片段获得的Hadoop文件系统:

FileSystem fileSystem = FileSystem.get(bc.getValue().value()); 

希望这帮助,如果别人是在同一船。

干杯!

JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class, 
     StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName)); 

JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>() { 
    public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception { 
     JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context()); 
     stringJavaPairRDD.saveAsTextFile("hdfs://"); 
     return stringJavaPairRDD; 
    } 
}); 
+0

感谢张的回复,感谢您发布一个方法来做到这一点。但在我的情况下,中间数据不是RDD,也不是流数据。 –

+0

我终于从你的答案中选择了一些想法,并能够得到解决方案。作为另一个答案发布......谢谢! –