Spark的认识(三)

Spark的认识(三)

1、本文内容

  • 1、掌握sparkSQL原理

  • 2、掌握DataFrame和DataSet数据结构和使用方式

  • 3、掌握sparksql代码开发

2、sparksql概述

2.1 sparksql前世今生

  • shark是专门为spark设计的大数据仓库系统

  • shark与hive代码兼容,同时它也依赖于spark的版本

  • 后期由于hive的代码升级,spark的版本不断变化。

  • hive的mapreduce处理思想限制了shark的性能

  • 后面慢慢把shark这个框架废弃了。把重点转移到sparksql上面

2.2 sparksql 是什么

  • Spark SQL is Apache Spark's module for working with structured data.

  • Sparksql是spark用来处理结构化数据的一个模块。它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

3、sparksql特性

  • 1、易整合

    • 将sql与spark应用程序混合使用,同时使用java、scala、python、R等不同的语言api去操作

  • 2、统一的数据源访问

    • sparksql可以以一种相同的方式来访问任意的外部数据源

    • SparkSession.read.文件格式("该格式的文件路径")

  • 3、兼容hive

    • 可以通过sparksql去操作hivesql

  • 4、支持标准的数据库连接

    • 可以通过jdbc或者是odbc来操作数据库表中的数据

4、DataFrame

4.1 DataFrame是什么

DataFrame它的前身是schemaRDD,它是在spark1.3.0之后把schemaRDD改名为DataFrame,
schemaRDD是直接继承自RDD,而DataFrame自己实现了RDD中的一些方法,你可以使用dataFrame调用rdd方法转换成一个RDD.
​
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
​

4.2 RDD与DataFrame区别

DataFrame比RDD多了对结构化数据的描述信息,就是schema元信息

4.3 RDD与DataFrame优缺点

  • 1、RDD

    • 优点

      • 1、编译时类型安全

        • 编译时类型检查,就是看一下当前的数据类型是不是我们程序需要的数据类型

      • 2、具有面向对象编程的特性

        • 可以使用面向对象编程来操作rdd

    • 缺点

      • 1、序列化和反序列化性能开销很大

        • 在进行分布式计算的时候,会涉及到数据大量的网络传输

        • RDD会把数据本身和数据结构信息首先都会进行序列化,然后在进行反序列化获取对象

      • 2、频繁创建大量的对象会带来GC(垃圾回收)

  • 2、DataFrame

    • DataFrame引入了schema和off-heap

    • 优点

      • 1、由于DataFrame引入了schema,解决了RDD中序列化和反序列性能开销很多这个缺点

        • 后期再进行数据传输的时候,只需要序列化和反序列化数据内容本身就可以了,对于数据结构信息也就是schema可以省略掉。

      • 2、由于DataFrame引入了off-heap(对象的创建不在jvm堆以内,直接使用操作系统层面上的内存),解决了RDD频繁创建大量的对象会带来GC(垃圾回收)这个缺点。

    • 缺点

      • DataFrame引入了schema和off-heap分别解决了RDD的2个缺点,同时它也丢失rdd的优点

        • 1、编译时类型不安全

        • 2、不具有面向对象编程的特性

 

5、读取数据源创建DataFrame

5.1 读取文本文件创建DataFrame

//读取文本文件创建DataFrame
val df1=spark.read.text("/person.txt") 
//打印schema
df1.printSchema
//展示数据
df1.show

5.2 读取json文件创建DataFrame

//读取json文件创建DataFrame
val df2=spark.read.json("/people.json")
//打印schema
df2.printSchema
//展示数据
df2.show

5.3 读取parquet列存储的文件创建DataFrame

//读取parquet文件创建DataFrame
val df3=spark.read.parquet("/users.parquet")
//打印schema
df3.printSchema
//展示数据
df3.show

 

6、DataFrame常用操作

6.1 DSL风格语法

  • 就是dataFrama自己封装了一套api,可以通过这套api来操作dataFrame

val rdd1=sc.textFile("/person.txt")
val rdd2=rdd1.map(_.split(" "))
case class Person(id:Int,name:String,age:Int)
val rdd3=rdd2.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
val df=rdd3.toDF
​
df.printSchema
df.show
​
df.select("name").show
df.select($"name").show
df.select(col("name")).show
​
df.foreach(row =>println(row))
df.foreach(row =>println(row.getInt("id")))
df.foreach(row =>println(row.getString("name")))
df.foreach(row =>println(row.getAs[String]("name")))
df.foreach(row =>println(row.getAs[Int]("id")))
df.foreach(row =>println(row.getAs[Int](0)))
df.foreach(row =>println(row.getAs[String](1)))
//row.get字段类型(字段名称)
//row.getAs[字段类型](字段名称)
//row.getAs[字段类型](字段下标)   注意下标是从0开始
​
df.select($"age"+1).show
​
df.filter($"age" >30).show
df.filter($"age" >30).count
​
df.groupBy("age").count.show

6.2 SQL风格语法

  • 可以把一个DataFrame看成一张表,需要先把DataFrame注册成一张表

    • personDF.registerTempTable("person")

  • SparkSession.sql(sql语句)

    spark.sql("select * from person").show
    spark.sql("select name from person").show
    spark.sql("select name,age from person").show
    spark.sql("select * from person where age >30").show
    spark.sql("select count(*) from person").show
    spark.sql("select * from person order by age desc").show

 

7、DataSet

7.1 什么是DataSet

DataSet是一个分布式的数据集合,它是支持强类型,在RDD每一行数据上加了一个类型约束,它是在spark1.6之后添加的新的接口。

7.2 DataSet与DataFrame互相转换

  • 1、DataSet转换成DataFrame

    • val df=ds.toDF

  • 2、DataFrame转换成DataSet

    • val ds=df.as[强类型]

  • 补充:

    • val rdd1=dataFrame.rdd

    • val rdd2=dataset.rdd

 

7.3 创建DataSet

  • 1、通过一个已经存在scala集合去构建

    • val ds=spark.createDataset(List(1,2,3,4,5))

    • val ds=List(1,2,3,4,5).toDS

  • 2、通过一个已经存在的RDD去构建

    • val ds=spark.createDataset(sc.textFile("/person.txt"))

  • 3、DataFrame转换成DataSet

    • val ds=df.as[强类型]

 

8、通过将RDD转化成DataFrame(利用了反射机制)

  • 1、引入依赖

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.1.3</version>
            </dependency>
    
  • 2、代码开发

    package demo.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    //todo:将RDD转换成DataFrame--利用反射机制(就是去定义样例类)
    case class Person(id:Int,name:String,age:Int)
    
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
        //1、创建SparkSession
         val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
    
        //2、创建SparkContext
        val sc: SparkContext = spark.sparkContext
         sc.setLogLevel("warn")
    
        //3、读取数据创建RDD
         val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" "))
    
        //4、将rdd与样例类进行关联
          val personRDD: RDD[Person] = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
    
        //5、获取dataFrame
         //手动导入隐式转换
          import spark.implicits._
          val personDF: DataFrame = personRDD.toDF
          //打印schema
          personDF.printSchema()
         //展示数据
          personDF.show()  //默认展示前20条数据,如果一个字符串大于20个字符,截取前20位  name=xxxxxxxxxxxxxxxxxxxxxxxxx
       //---------------------DSL风格语法-------------start
          personDF.show(1)
          println(personDF.first())
          personDF.head(3).foreach(println)
    
         personDF.select("name").show()
         personDF.select($"name",$"id").show()
         personDF.select(new Column("name")).show()
    
        personDF.select($"name",$"age",$"age"+1).show()
        personDF.filter($"age" >30).show()
        println(personDF.filter($"age" >30).count())
        personDF.groupBy("age").count().show()
        //---------------------DSL风格语法-------------end
    
        //---------------------SQL风格语法-------------start
        personDF.createTempView("person")
        spark.sql("select * from person").show()
        spark.sql("select name,id from person").show()
        spark.sql("select * from person where age >30").show()
        spark.sql("select * from person order by age desc").show()
    
        //---------------------SQL风格语法-------------end
    
        //关闭
          sc.stop()
          spark.stop()
      }
    }
    
    

9、通过将RDD转化成DataFrame(使用StructType指定schema)

  • 1、代码开发

    package demo.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    //todo:将RDD转换成DataFrame----通过使用StructType类型来指定schema
    object StructTypeSchema {
      def main(args: Array[String]): Unit = {
         //1、创建SparkSession
          val spark: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
    
         //2、创建SparkContext
           val sc: SparkContext = spark.sparkContext
          sc.setLogLevel("warn")
    
        //3、读取数据文件
          val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(x=>x.split(" "))
    
        //4、需要把rdd1与Row进行关联
         val rowRDD: RDD[Row] = rdd1.map(x=>Row(x(0),x(1),x(2).toInt))
    
        //5、StructType指定schema
        val schema =
          StructType(
              StructField("id", StringType, true) ::
              StructField("name", StringType, false) ::
              StructField("age", IntegerType, false) :: Nil)
    
    
          val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)
    
          dataFrame.printSchema()
          dataFrame.show()
    
          dataFrame.createTempView("person")
          spark.sql("select * from person ").show()
          spark.sql("select * from person where id=1").show()
    
         //关闭
         sc.stop()
         spark.stop()
    
      }
    }
    
    

10、sparksql操作hivesql

  • 1、引入依赖

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.1.3</version>
            </dependency>
    
  • 2、代码开发

    package demo.sparksql
    
    import org.apache.spark.sql.SparkSession
    
    //todo:利用sparksql操作hivesql
    object HiveSupport {
      def main(args: Array[String]): Unit = {
         //1、创建SparkSession
          val spark: SparkSession = SparkSession.builder()
                                                .appName("HiveSupport")
                                                .master("local[2]")
                                                .enableHiveSupport() //开启对hivesql支持
                                                .getOrCreate()
    
        //2、利用sparkSession操作hivesql
           //2.1 创建一个hive表
             spark.sql("create table user(id int,name string,age int) row format delimited fields terminated by ','")
    
           //2.2 加载数据到表中
             spark.sql("load data local inpath './data/user.txt' into table user ")
    
           //2.3 查询
            spark.sql("select * from user").show()
    
    
         spark.stop()
      }
    }
    
    

11、JDBC数据源

11.1 sparksql从mysql表中加载数据

  • 1、代码开发

    package demo.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql加载mysql表中的数据
    object DataFromMysql {
      def main(args: Array[String]): Unit = {
          //1、创建SparkSession
         val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
    
    
         //2、读取mysql表中的数据
            //2.1 定义url
                val url="jdbc:mysql://node1:3306/spark"
           //2.2 定义表名
                val table="iplocation"
           //2.3 定义mysql服务属性
                val properties = new Properties()
                properties.setProperty("user","root")
                properties.setProperty("password","123456")
    
         val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)
    
           mysqlDF.printSchema()
           mysqlDF.show()
    
           mysqlDF.createTempView("iplocation")
           spark.sql("select * from iplocation").show()
           spark.sql("select * from iplocation where total_count>1000").show()
    
    
         spark.stop()
      }
    }
    
    

11.2 sparksql把结果数据写入到mysql表中

  • 1、代码开发

    package demo.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql把结果数据写入到mysql表中
    
    case class Student(id:Int,name:String,age:Int)
    object Data2Mysql {
      def main(args: Array[String]): Unit = {
          //1、创建SparkSession
          val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").master("local[2]").getOrCreate()
    
         //2、创建SparkContext
          val sc: SparkContext = spark.sparkContext
         sc.setLogLevel("warn")
    
        //3、读取文件数据
          val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" "))
    
        //4、rdd与样例类关联
          val studentRDD: RDD[Student] = rdd1.map(x=>Student(x(0).toInt,x(1),x(2).toInt))
    
        //5、将rdd转换成dataFrame
           import  spark.implicits._
           val studentDF: DataFrame = studentRDD.toDF()
    
        //5、打印schema
          studentDF.printSchema()
          studentDF.show()
    
          studentDF.createTempView("student")
           //把年龄大于30的用户过滤出来
          val result: DataFrame = spark.sql("select * from student where age >30")
    
           //把结果数据写入到mysql表中
               //2.1 定义url
                 val url="jdbc:mysql://node1:3306/spark"
              //2.2 定义表名
                val table="student200"
             //2.3 定义mysql服务属性
                val properties = new Properties()
                properties.setProperty("user","root")
                properties.setProperty("password","123456")
       //mode:表示数据插入的模式
            //方法中可以传入4个参数
              //overwrite:表示覆盖。如果表事先不存在,它会帮我们创建
             //append  : 表示追加,如果表事先不存在,它会帮我们创建
             //ignore :表示忽略 ,如果表事先存在,它不会进行数据插入,直接忽略
             //error  :默认选项,表示如果表存在就报错
        result.write.mode("ignore").jdbc(url,table,properties)
    
        //关闭
          sc.stop()
          spark.stop()
    
      }
    }
    
  • 2、打成jar包 集群运行

    package demo.sparksql
    
    import java.util.Properties
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql把结果数据写入到mysql表中
    
    case class Student(id:Int,name:String,age:Int)
    object Data2Mysql {
      def main(args: Array[String]): Unit = {
          //1、创建SparkSession
          val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate()
    
         //2、创建SparkContext
          val sc: SparkContext = spark.sparkContext
         sc.setLogLevel("warn")
    
        //3、读取文件数据
          val rdd1: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" "))
    
        //4、rdd与样例类关联
          val studentRDD: RDD[Student] = rdd1.map(x=>Student(x(0).toInt,x(1),x(2).toInt))
    
        //5、将rdd转换成dataFrame
           import  spark.implicits._
           val studentDF: DataFrame = studentRDD.toDF()
    
        //5、打印schema
          studentDF.printSchema()
          studentDF.show()
    
          studentDF.createTempView("student")
           //把年龄大于30的用户过滤出来
          val result: DataFrame = spark.sql("select * from student where age >30")
    
           //把结果数据写入到mysql表中
               //2.1 定义url
                 val url="jdbc:mysql://node1:3306/spark"
              //2.2 定义表名
                val table=args(1)
             //2.3 定义mysql服务属性
                val properties = new Properties()
                properties.setProperty("user","root")
                properties.setProperty("password","123456")
       //mode:表示数据插入的模式
            //方法中可以传入4个参数
              //overwrite:表示覆盖。如果表事先不存在,它会帮我们创建
             //append  : 表示追加,如果表事先不存在,它会帮我们创建
             //ignore :表示忽略 ,如果表事先存在,它不会进行数据插入,直接忽略
             //error  :默认选项,表示如果表存在就报错
        result.write.mode("append").jdbc(url,table,properties)
    
        //关闭
          sc.stop()
          spark.stop()
    
      }
    }
    
    
  • 3、集群提交运行

    spark-submit --master spark://node1:7077 --class cn.itcast.sparksql.Data2Mysql --executor-memory 1g --total-executor-cores 4 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class12-1.0-SNAPSHOT.jar /person.txt token 

请关注公众号:

Spark的认识(三)