结果在一个空RDD
问题描述:
转型我有一个RDD(combinerRDD),关于这一点我在下面加改造结果在一个空RDD
JavaPairRDD<String, Integer> counts = combinerRDD.mapToPair(
new PairFunction<Tuple2<LongWritable, Text>, String, Integer>() {
String filename;
Integer count;
Message message;
@Override
public Tuple2<String, Integer> call(Tuple2<LongWritable, Text> tuple) throws Exception {
xlhrCount = 0;
filename = "";
filename = "New_File";
for (JobStep js : message.getJobStep()) {
if (js.getStepName().equals(StepName.NEW_STEP)) {
count += 1;
}
}
return new Tuple2<String, Integer>(filename, xlhrCount);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) throws Exception {
return (count1 + count2);
}
}
);
我的问题是,当combinerRDD
里面有一些数据,我得到正确的结果。但当combinerRDD
是空的写入HDFS的结果只是一个空的_SUCCESS文件。我期待2个文件在空RDD上转换,例如_SUCCESS和空白部分00000文件。我对吗?我应该得到多少个输出文件。
我之所以问这是因为我在2个集群中得到了不同的结果,在集群1上运行的代码导致了_SUCCESS文件,并且集群2导致了_SUCCESS和空的部分00000。我现在很困惑。结果是否依赖于任何群集设置?
注意:我在newRDD.leftOuterJoin(combinerRDD)
上做了一个左连接,这没有给出结果(当combinerRDD只有_SUCCESS时)并且newRDD包含值。
答
好的,所以我找到了解决方案。我正在使用spark-1.3.0,它有以下问题:ie。一个emptyRDD的左外连接会给出空结果。
https://issues.apache.org/jira/browse/SPARK-9236
我创建对空RDD象下面这样:
JavaRDD<Tuple2<LongWritable, Text>> emptyRDD = context.emptyRDD();
myRDD = JavaPairRDD.fromJavaRDD(emptyRDD);
现在使用:
List<Tuple2<LongWritable, Text>> data = Arrays.asList();
JavaRDD<Tuple2<LongWritable, Text>> emptyRDD = context.parallelize(data);
myRDD = JavaPairRDD.fromJavaRDD(emptyRDD);
它现在,即我的RDD是没有更多的空。修正版本有: 1.3.2,1.4.2,1.5.0(参考上面的链接)。