(DataFrame部分)通过反射方式将非json格式的RDD转换成DataFrame【Java版纯代码】
package com.bjsxt;
import org.apache.hadoop.hive.ql.parse.HiveParser.rowFormat_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* 通过反射方式将非json格式的RDD转换成DataFrame
* 注意:这种方式不推荐使用
* @author Administrator
*
*/
public class CreateDFFrameRDDWithReflect {
public static void main(String[] args) {
/**
* 1.注意:自定义类访问级别必须是public
* 2.RDD转成DataFrame会把自定义类中字段的名称按照ASCII码排序
* 3.自定义类要实现序列化接口
*/
SparkConf conf=new SparkConf().setAppName("CreateDFFrameRDDWithReflect").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./person");
JavaRDD<Person> personRDD = lineRDD.map(new Function<String,Person>() {
@Override
public Person call(String line) throws Exception {
Person p=new Person();
p.setId(line.split(",")[0]);
p.setName(line.split(",")[1]);
p.setAge(line.split(",")[2]);
return p;
}
});
/**
* 传入进去person.class的时候,sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得person的所有field,结合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.printSchema();
df.registerTempTable("person");
DataFrame sql=sqlContext.sql("select name,id,age from person where id=2");
sql.show();
/**
* 将DataFrame转成JavaRDD
* 注意:
* 1.可以使用row.getint(0),row.getString(1)...通过下标获取返回row类型的数据,
* 但是要注意列的顺序问题
* 2.可以使用row.getAas("列名")来获取对应的值
*/
JavaRDD<Row> javaRDD = df.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
@Override
public Person call(Row row) throws Exception {
Person p=new Person();
p.setId(row.get(0)+"");
p.setName(row.getString(1));
p.setAge(row.get(2)+"");
return p;
}
});
map.foreach(new VoidFunction<Person>() {
@Override
public void call(Person arg0) throws Exception {
System.out.println(arg0);
System.out.println("________________________________________");
}
});
}
}