SparkSQL随机DataFrame/DataSet数据源query查询用户数据(Java版/Scala版)
SparkSQL随机DataFrame/DataSet数据源query查询用户数据(Java版/Scala版)
SparkSQL入门小demo,主要操作是构造DataFrame/Dataset,以及通过它们去执行Sql
一、以下为Java版本的Demo
Java版本(DataSourceJava.java、App.java)
DataSourceJava.java
package top.it1002.spark.sql.income;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* @Author 王磊
* @Date 2018/12/11
* @ClassName DataSourceJava
* @Description java版生产用户随机收入Dataset
**/
public class DataSourceJava {
public Dataset<Row> getDataSource(){
// 获取用户列表
ArrayList<String> userList = getUserList();
// 生产用户随机月收入数据
List<Row> rows = new ArrayList<Row>();
for(int i = 0 ; i < userList.size() ; i++){
for(int month = 1 ; month <= 12 ; month++){
rows.add(RowFactory.create(userList.get(i), month, getRandIncome()));
}
}
// 创建Dataset数据列结构
ArrayList<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("month", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("income", DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(structFields);
// 创建Dataset
SparkSession ss = getSparkSession();
return ss.createDataFrame(rows,schema);
}
/**
* 获取用户列表
* @return
*/
public ArrayList<String> getUserList(){
ArrayList<String> userList = new ArrayList<String>();
userList.add("Michael");
userList.add("Andy");
userList.add("Justin");
userList.add("Berta");
userList.add("Tom");
userList.add("Tomas");
userList.add("Bob");
return userList;
}
/**
* 获取100 - 300之间随机收入
* @return
*/
public int getRandIncome(){
return 100 + new Random().nextInt(200);
}
/**
* 获取sparkSession对象
* @return
*/
public SparkSession getSparkSession(){
return SparkSession.builder().appName("ss").master("local[3]").getOrCreate();
}
}
App.java
package top.it1002.spark.sql.income;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* @Author 王磊
* @Date 2018/12/11
* @ClassName App
* @Description Java版SparkSql实现简单查询
**/
public class App {
public static void main(String[] args) {
// 获取数据源
Dataset<Row> sourceData = new DataSourceJava().getDataSource();
// SparkSQL查询
Dataset<Row> avgTable = sourceData.groupBy("name").avg("income").orderBy("avg(income)");
// 数据显示
avgTable.show();
}
}
二、以下为Scala版本的Demo
Scala版本(DataSourceScala.scala、APP.scala)
DataSourceScala.scala
package top.it1002.spark.sql.income
import java.util
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* @Author 王磊
* @Date 2018/12/11
* @ClassName DataSourceJava
* @Description Scala版生产用户随机收入DataFrame
**/
class DataSourceScala {
def getDataSource = {
// 获取用户列表
val userList = getUserList
// 获取用户数
val peoples = userList.length
// 创建行数据容器列表
val rows = new java.util.ArrayList[Row]()
// 循环向容器内装入行数据
for(pindex <- 0 until peoples){
for(month <- 1 to 12){
rows.add(Row(userList(pindex), month, getRandIncome))
}
}
// 创建列数据结构容器
val field = new util.ArrayList[StructField]()
// 追加列数据结构对象
field.add(StructField("name", DataTypes.StringType,true))
field.add(StructField("month", DataTypes.IntegerType,true))
field.add(StructField("income", DataTypes.IntegerType,true))
// 创建列数据类型对象
val schema = StructType(field)
// 获取sparkSession
val sess = getSparkSession
// 通过行列表数据与数据类型对象创建DataFrame
val df = sess.createDataFrame(rows, schema)
// 返回DataFrame
df
}
/**
* 创建SparkSession对象
* @return
*/
def getSparkSession = SparkSession.builder().master("local[3]").appName("test").getOrCreate()
/**
* 返回用户列表
* @return
*/
def getUserList = List("Michael","Andy","Justin","Berta","Tom","Tomas","Bob")
/**
* 生成100 - 300之间的随机整数
* @return
*/
def getRandIncome = 100 + new util.Random().nextInt(200)
}
APP.scala
package top.it1002.spark.sql.income
/**
* @Author 王磊
* @Date 2018/12/11
* @ClassName App
* @Description Scala版SparkSql实现简单查询
**/
object APP {
def main(args: Array[String]): Unit = {
// 获取数据源
val sourceData = new DataSourceScala().getDataSource
// 求每个用户月均收入,升序排序
val avgTable = sourceData.groupBy("name").avg("income").orderBy("avg(income)")
// 数据展示
avgTable.show()
}
}
三、执行效果
后续会更新java/scala版本相关小例子