读取json格式的RDD创建DF【java版纯代码】
package com.bjsxt;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* 读取json格式的RDD创建DF
* @author Administrator
*
*/
public class Java {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
List<String> asList1 = Arrays.asList(
"{'name':'zhangsan','age':\"18\"}",
"{'name':\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
);
JavaRDD<String> nameRDD = sc.parallelize(asList1);
List<String> asList2 = Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
);
JavaRDD<String> scoreRDD = sc.parallelize(asList2);
DataFrame namedf = sqlContext.read().json(nameRDD);
namedf.show();
DataFrame scoredf = sqlContext.read().json(scoreRDD);
scoredf.show();
/**
* 注册成临时表使用
*/
namedf.registerTempTable("name");
scoredf.registerTempTable("score");
/**
* 如果自己写的sql查询得到的DataFrame结果中的列会按照查询的字段返回
*/
DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name join score on name.name=score.name");
result.show();
sc.stop();
}
}