读取json格式的RDD创建DF【java版纯代码】

package com.bjsxt;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * 读取json格式的RDD创建DF
 * @author Administrator
 *
 */

public class Java {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf().setAppName("test").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        SQLContext sqlContext=new SQLContext(sc);
        List<String> asList1 = Arrays.asList(
        "{'name':'zhangsan','age':\"18\"}",
        "{'name':\"lisi\",\"age\":\"19\"}",
        "{\"name\":\"wangwu\",\"age\":\"20\"}"
        );
        JavaRDD<String> nameRDD = sc.parallelize(asList1);
        List<String> asList2 = Arrays.asList(
                "{\"name\":\"zhangsan\",\"score\":\"100\"}",
                "{\"name\":\"lisi\",\"score\":\"200\"}",
                "{\"name\":\"wangwu\",\"score\":\"300\"}"
        );
        JavaRDD<String> scoreRDD = sc.parallelize(asList2);
        DataFrame namedf = sqlContext.read().json(nameRDD);
        namedf.show();
        DataFrame scoredf = sqlContext.read().json(scoreRDD);
        scoredf.show();
        /**
         * 注册成临时表使用
         */
        namedf.registerTempTable("name");
        scoredf.registerTempTable("score");
        /**
         * 如果自己写的sql查询得到的DataFrame结果中的列会按照查询的字段返回
         */
        DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name join score on name.name=score.name");
        result.show();
        sc.stop();
    }

}
读取json格式的RDD创建DF【java版纯代码】