SparkSQL随机DataFrame/DataSet数据源query查询用户数据(Java版/Scala版)

SparkSQL随机DataFrame/DataSet数据源query查询用户数据(Java版/Scala版)

SparkSQL入门小demo,主要操作是构造DataFrame/Dataset,以及通过它们去执行Sql

一、以下为Java版本的Demo

Java版本(DataSourceJava.java、App.java)

DataSourceJava.java

package top.it1002.spark.sql.income;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @Author      王磊
 * @Date        2018/12/11
 * @ClassName   DataSourceJava
 * @Description java版生产用户随机收入Dataset
 **/
public class DataSourceJava {
    public Dataset<Row> getDataSource(){
        // 获取用户列表
        ArrayList<String> userList = getUserList();
        // 生产用户随机月收入数据
        List<Row> rows = new ArrayList<Row>();
        for(int i = 0 ; i < userList.size() ; i++){
            for(int month = 1 ; month <= 12 ; month++){
                rows.add(RowFactory.create(userList.get(i), month, getRandIncome()));
            }
        }
        // 创建Dataset数据列结构
        ArrayList<StructField> structFields  = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("month", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("income", DataTypes.IntegerType, true));
        StructType schema = DataTypes.createStructType(structFields);
        // 创建Dataset
        SparkSession ss = getSparkSession();
        return ss.createDataFrame(rows,schema);
    }

    /**
     * 获取用户列表
     * @return
     */
    public ArrayList<String> getUserList(){
        ArrayList<String> userList = new ArrayList<String>();
        userList.add("Michael");
        userList.add("Andy");
        userList.add("Justin");
        userList.add("Berta");
        userList.add("Tom");
        userList.add("Tomas");
        userList.add("Bob");
        return userList;
    }

    /**
     * 获取100 - 300之间随机收入
     * @return
     */
    public int getRandIncome(){
        return 100 + new Random().nextInt(200);
    }

    /**
     * 获取sparkSession对象
     * @return
     */
    public SparkSession getSparkSession(){
        return SparkSession.builder().appName("ss").master("local[3]").getOrCreate();
    }
}

App.java

package top.it1002.spark.sql.income;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
 * @Author      王磊
 * @Date        2018/12/11
 * @ClassName   App
 * @Description Java版SparkSql实现简单查询
 **/
public class App {
    public static void main(String[] args) {
        // 获取数据源
        Dataset<Row> sourceData = new DataSourceJava().getDataSource();
        // SparkSQL查询
        Dataset<Row> avgTable = sourceData.groupBy("name").avg("income").orderBy("avg(income)");
        // 数据显示
        avgTable.show();
    }
}

二、以下为Scala版本的Demo

Scala版本(DataSourceScala.scala、APP.scala)

DataSourceScala.scala

package top.it1002.spark.sql.income

import java.util

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * @Author       王磊
  * @Date         2018/12/11
  * @ClassName    DataSourceJava
  * @Description  Scala版生产用户随机收入DataFrame
  **/

class DataSourceScala {

  def getDataSource = {
    // 获取用户列表
    val userList = getUserList
    // 获取用户数
    val peoples = userList.length
    // 创建行数据容器列表
    val rows = new java.util.ArrayList[Row]()
    // 循环向容器内装入行数据
    for(pindex <- 0 until peoples){
      for(month <- 1 to 12){
        rows.add(Row(userList(pindex), month, getRandIncome))
      }
    }
    // 创建列数据结构容器
    val field = new util.ArrayList[StructField]()
    // 追加列数据结构对象
    field.add(StructField("name", DataTypes.StringType,true))
    field.add(StructField("month", DataTypes.IntegerType,true))
    field.add(StructField("income", DataTypes.IntegerType,true))
    // 创建列数据类型对象
    val schema = StructType(field)
    // 获取sparkSession
    val sess = getSparkSession
    // 通过行列表数据与数据类型对象创建DataFrame
    val df = sess.createDataFrame(rows, schema)
    // 返回DataFrame
    df
  }

  /**
    * 创建SparkSession对象
    * @return
    */
  def getSparkSession = SparkSession.builder().master("local[3]").appName("test").getOrCreate()

  /**
    * 返回用户列表
    * @return
    */
  def getUserList = List("Michael","Andy","Justin","Berta","Tom","Tomas","Bob")

  /**
    * 生成100 - 300之间的随机整数
    * @return
    */
  def getRandIncome = 100 + new util.Random().nextInt(200)
}


APP.scala

package top.it1002.spark.sql.income

/**
  * @Author       王磊
  * @Date         2018/12/11
  * @ClassName    App
  * @Description  Scala版SparkSql实现简单查询
  **/

object APP {
  def main(args: Array[String]): Unit = {
    // 获取数据源
    val sourceData = new DataSourceScala().getDataSource
    // 求每个用户月均收入,升序排序
    val avgTable = sourceData.groupBy("name").avg("income").orderBy("avg(income)")
    // 数据展示
    avgTable.show()
  }

}

三、执行效果

SparkSQL随机DataFrame/DataSet数据源query查询用户数据(Java版/Scala版)

后续会更新java/scala版本相关小例子