如何将Spark DataFrame转换为Java中POJO的RDD
问题描述:
我对Spark很新。我想将DataFrame转换为POJO的RDD。像:如何将Spark DataFrame转换为Java中POJO的RDD
JavaRDD<POJOClass> data = df.toJavaRDD();
其中df是一个数据框。
df.show()
给出:
+---------+---------+---------+---------+
| A | B | C | D |
+---------+---------+---------+---------+
|603300042| 1025| 2|127000948|
|603303766| 1112| 2|127000364|
|603302691| 1184| 2|127000853|
|603303766| 1112| 2|127000364|
|603302691| 1184| 2|127000853|
|603303766| 1112| 2|127000364|
|603303787| 1041| 2|137000323|
|603306351| 1041| 2|137000468|
|603304009| 1307| 2|137000788|
|603303830| 1041| 2|137000012|
|603301119| 1002| 2|137000369|
|603301507| 1188| 2|137001568|
|603302168| 1041| 2|137000468|
+---------+---------+---------+---------+
我的POJO类如下:
public static class POJOClass {
public Long A;
public Integer B;
public Integer C;
public Long D;
}
我知道
JavaRDD<Row> data = df.toJavaRDD();
效果很好。但是,我有什么解决办法可以实现?
答
可以使用的数据集
public static class POJOClass implements serializable{
public Long A;
public Integer B;
public Integer C;
public Long D;
}
Dataset<POJOClass> pojos = context.read().json("/data.json").as(Encoders.bean(POJOClass.class));
答
试试这个(未测试):
JavaPairRDD<Long, POJOClass> jpRDD = jdbcDF.toJavaRDD().mapToPair(new PairFunction<Row, Long, POJOClass>() {
public Tuple2<Long, POJOClass> call(Row row) throws Exception {
POJOClass yourPojo = new POJOClass();
// Fill your pojo using row.get(index)
return new Tuple2<Long, POJOClass>(anIndex , (POJOClass) yourPojo);
}
});
答
您可以使用地图功能,如下图所示。
import org.apache.spark.api.java.function.Function;
JavaRDD<POJOClass> data = df.toJavaRDD().map(new Function<Row, POJOClass>() {
@Override
public POJOClass call(Row row) {
POJOClass pojo = new POJOClass();
pojo.setA(row.getLong(0));
pojo.setB(row.getInt(1));
pojo.setC(row.getInt(2));
pojo.setD(row.getLong(3));
return pojo;
}
});
Hi Noman,要求是只使用RDD。 –