(DataFrame部分)通过反射方式将非json格式的RDD转换成DataFrame【Java版纯代码】

package com.bjsxt;


import org.apache.hadoop.hive.ql.parse.HiveParser.rowFormat_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
 * 通过反射方式将非json格式的RDD转换成DataFrame
 * 注意:这种方式不推荐使用
 * @author Administrator
 *
 */
public class CreateDFFrameRDDWithReflect {
    

    public static void main(String[] args) {
        /**
         * 1.注意:自定义类访问级别必须是public
         * 2.RDD转成DataFrame会把自定义类中字段的名称按照ASCII码排序
         * 3.自定义类要实现序列化接口
         */
        SparkConf conf=new SparkConf().setAppName("CreateDFFrameRDDWithReflect").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        SQLContext sqlContext=new SQLContext(sc);
        JavaRDD<String> lineRDD = sc.textFile("./person");
        JavaRDD<Person> personRDD = lineRDD.map(new Function<String,Person>() {
            @Override
            public Person call(String line) throws Exception {
                Person p=new Person();
                p.setId(line.split(",")[0]);
                p.setName(line.split(",")[1]);
                p.setAge(line.split(",")[2]);                
                return p;
            }            
        });
        /**
         * 传入进去person.class的时候,sqlContext是通过反射的方式创建DataFrame
         * 在底层通过反射的方式获得person的所有field,结合RDD本身,就生成了DataFrame
         */
        DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
        df.show();
        df.printSchema();
        df.registerTempTable("person");
        DataFrame sql=sqlContext.sql("select name,id,age from person where id=2");
        sql.show();
        /**
         * 将DataFrame转成JavaRDD
         * 注意:
         * 1.可以使用row.getint(0),row.getString(1)...通过下标获取返回row类型的数据,
         * 但是要注意列的顺序问题
         * 2.可以使用row.getAas("列名")来获取对应的值
         */
        JavaRDD<Row> javaRDD = df.javaRDD();
        JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {

            @Override
            public Person call(Row row) throws Exception {
                Person p=new Person();
                p.setId(row.get(0)+"");
                p.setName(row.getString(1));
                p.setAge(row.get(2)+"");
                
                return p;
            }
        });
        map.foreach(new VoidFunction<Person>() {
            
            @Override
            public void call(Person arg0) throws Exception {
                System.out.println(arg0);
                System.out.println("________________________________________");                
            }
        });    
    }
}

(DataFrame部分)通过反射方式将非json格式的RDD转换成DataFrame【Java版纯代码】