Spark的认识(三)
Spark的认识(三)
1、本文内容
-
1、掌握sparkSQL原理
-
2、掌握DataFrame和DataSet数据结构和使用方式
-
3、掌握sparksql代码开发
2、sparksql概述
2.1 sparksql前世今生
-
shark是专门为spark设计的大数据仓库系统
-
shark与hive代码兼容,同时它也依赖于spark的版本
-
后期由于hive的代码升级,spark的版本不断变化。
-
hive的mapreduce处理思想限制了shark的性能
-
后面慢慢把shark这个框架废弃了。把重点转移到sparksql上面
2.2 sparksql 是什么
-
Spark SQL is Apache Spark's module for working with structured data.
-
Sparksql是spark用来处理结构化数据的一个模块。它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
3、sparksql特性
-
1、易整合
-
将sql与spark应用程序混合使用,同时使用java、scala、python、R等不同的语言api去操作
-
-
2、统一的数据源访问
-
sparksql可以以一种相同的方式来访问任意的外部数据源
-
SparkSession.read.文件格式("该格式的文件路径")
-
-
3、兼容hive
-
可以通过sparksql去操作hivesql
-
-
4、支持标准的数据库连接
-
可以通过jdbc或者是odbc来操作数据库表中的数据
-
4、DataFrame
4.1 DataFrame是什么
DataFrame它的前身是schemaRDD,它是在spark1.3.0之后把schemaRDD改名为DataFrame, schemaRDD是直接继承自RDD,而DataFrame自己实现了RDD中的一些方法,你可以使用dataFrame调用rdd方法转换成一个RDD. 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
4.2 RDD与DataFrame区别
DataFrame比RDD多了对结构化数据的描述信息,就是schema元信息
4.3 RDD与DataFrame优缺点
-
1、RDD
-
优点
-
1、编译时类型安全
-
编译时类型检查,就是看一下当前的数据类型是不是我们程序需要的数据类型
-
-
2、具有面向对象编程的特性
-
可以使用面向对象编程来操作rdd
-
-
-
缺点
-
1、序列化和反序列化性能开销很大
-
在进行分布式计算的时候,会涉及到数据大量的网络传输
-
RDD会把数据本身和数据结构信息首先都会进行序列化,然后在进行反序列化获取对象
-
-
2、频繁创建大量的对象会带来GC(垃圾回收)
-
-
-
2、DataFrame
-
DataFrame引入了schema和off-heap
-
优点
-
1、由于DataFrame引入了schema,解决了RDD中序列化和反序列性能开销很多这个缺点
-
后期再进行数据传输的时候,只需要序列化和反序列化数据内容本身就可以了,对于数据结构信息也就是schema可以省略掉。
-
-
2、由于DataFrame引入了off-heap(对象的创建不在jvm堆以内,直接使用操作系统层面上的内存),解决了RDD频繁创建大量的对象会带来GC(垃圾回收)这个缺点。
-
-
缺点
-
DataFrame引入了schema和off-heap分别解决了RDD的2个缺点,同时它也丢失rdd的优点
-
1、编译时类型不安全
-
2、不具有面向对象编程的特性
-
-
-
5、读取数据源创建DataFrame
5.1 读取文本文件创建DataFrame
//读取文本文件创建DataFrame val df1=spark.read.text("/person.txt") //打印schema df1.printSchema //展示数据 df1.show
5.2 读取json文件创建DataFrame
//读取json文件创建DataFrame val df2=spark.read.json("/people.json") //打印schema df2.printSchema //展示数据 df2.show
5.3 读取parquet列存储的文件创建DataFrame
//读取parquet文件创建DataFrame val df3=spark.read.parquet("/users.parquet") //打印schema df3.printSchema //展示数据 df3.show
6、DataFrame常用操作
6.1 DSL风格语法
-
就是dataFrama自己封装了一套api,可以通过这套api来操作dataFrame
val rdd1=sc.textFile("/person.txt") val rdd2=rdd1.map(_.split(" ")) case class Person(id:Int,name:String,age:Int) val rdd3=rdd2.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) val df=rdd3.toDF df.printSchema df.show df.select("name").show df.select($"name").show df.select(col("name")).show df.foreach(row =>println(row)) df.foreach(row =>println(row.getInt("id"))) df.foreach(row =>println(row.getString("name"))) df.foreach(row =>println(row.getAs[String]("name"))) df.foreach(row =>println(row.getAs[Int]("id"))) df.foreach(row =>println(row.getAs[Int](0))) df.foreach(row =>println(row.getAs[String](1))) //row.get字段类型(字段名称) //row.getAs[字段类型](字段名称) //row.getAs[字段类型](字段下标) 注意下标是从0开始 df.select($"age"+1).show df.filter($"age" >30).show df.filter($"age" >30).count df.groupBy("age").count.show
6.2 SQL风格语法
-
可以把一个DataFrame看成一张表,需要先把DataFrame注册成一张表
-
personDF.registerTempTable("person")
-
-
SparkSession.sql(sql语句)
spark.sql("select * from person").show spark.sql("select name from person").show spark.sql("select name,age from person").show spark.sql("select * from person where age >30").show spark.sql("select count(*) from person").show spark.sql("select * from person order by age desc").show
7、DataSet
7.1 什么是DataSet
DataSet是一个分布式的数据集合,它是支持强类型,在RDD每一行数据上加了一个类型约束,它是在spark1.6之后添加的新的接口。
7.2 DataSet与DataFrame互相转换
-
1、DataSet转换成DataFrame
-
val df=ds.toDF
-
-
2、DataFrame转换成DataSet
-
val ds=df.as[强类型]
-
-
补充:
-
val rdd1=dataFrame.rdd
-
val rdd2=dataset.rdd
-
7.3 创建DataSet
-
1、通过一个已经存在scala集合去构建
-
val ds=spark.createDataset(List(1,2,3,4,5))
-
val ds=List(1,2,3,4,5).toDS
-
-
2、通过一个已经存在的RDD去构建
-
val ds=spark.createDataset(sc.textFile("/person.txt"))
-
-
3、DataFrame转换成DataSet
-
val ds=df.as[强类型]
-
8、通过将RDD转化成DataFrame(利用了反射机制)
-
1、引入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency>
-
2、代码开发
package demo.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, SparkSession} //todo:将RDD转换成DataFrame--利用反射机制(就是去定义样例类) case class Person(id:Int,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //2、创建SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取数据创建RDD val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" ")) //4、将rdd与样例类进行关联 val personRDD: RDD[Person] = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5、获取dataFrame //手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF //打印schema personDF.printSchema() //展示数据 personDF.show() //默认展示前20条数据,如果一个字符串大于20个字符,截取前20位 name=xxxxxxxxxxxxxxxxxxxxxxxxx //---------------------DSL风格语法-------------start personDF.show(1) println(personDF.first()) personDF.head(3).foreach(println) personDF.select("name").show() personDF.select($"name",$"id").show() personDF.select(new Column("name")).show() personDF.select($"name",$"age",$"age"+1).show() personDF.filter($"age" >30).show() println(personDF.filter($"age" >30).count()) personDF.groupBy("age").count().show() //---------------------DSL风格语法-------------end //---------------------SQL风格语法-------------start personDF.createTempView("person") spark.sql("select * from person").show() spark.sql("select name,id from person").show() spark.sql("select * from person where age >30").show() spark.sql("select * from person order by age desc").show() //---------------------SQL风格语法-------------end //关闭 sc.stop() spark.stop() } }
9、通过将RDD转化成DataFrame(使用StructType指定schema)
-
1、代码开发
package demo.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} //todo:将RDD转换成DataFrame----通过使用StructType类型来指定schema object StructTypeSchema { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate() //2、创建SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取数据文件 val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(x=>x.split(" ")) //4、需要把rdd1与Row进行关联 val rowRDD: RDD[Row] = rdd1.map(x=>Row(x(0),x(1),x(2).toInt)) //5、StructType指定schema val schema = StructType( StructField("id", StringType, true) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: Nil) val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema) dataFrame.printSchema() dataFrame.show() dataFrame.createTempView("person") spark.sql("select * from person ").show() spark.sql("select * from person where id=1").show() //关闭 sc.stop() spark.stop() } }
10、sparksql操作hivesql
-
1、引入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.3</version> </dependency>
-
2、代码开发
package demo.sparksql import org.apache.spark.sql.SparkSession //todo:利用sparksql操作hivesql object HiveSupport { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("HiveSupport") .master("local[2]") .enableHiveSupport() //开启对hivesql支持 .getOrCreate() //2、利用sparkSession操作hivesql //2.1 创建一个hive表 spark.sql("create table user(id int,name string,age int) row format delimited fields terminated by ','") //2.2 加载数据到表中 spark.sql("load data local inpath './data/user.txt' into table user ") //2.3 查询 spark.sql("select * from user").show() spark.stop() } }
11、JDBC数据源
11.1 sparksql从mysql表中加载数据
-
1、代码开发
package demo.sparksql import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql加载mysql表中的数据 object DataFromMysql { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() //2、读取mysql表中的数据 //2.1 定义url val url="jdbc:mysql://node1:3306/spark" //2.2 定义表名 val table="iplocation" //2.3 定义mysql服务属性 val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties) mysqlDF.printSchema() mysqlDF.show() mysqlDF.createTempView("iplocation") spark.sql("select * from iplocation").show() spark.sql("select * from iplocation where total_count>1000").show() spark.stop() } }
11.2 sparksql把结果数据写入到mysql表中
-
1、代码开发
package demo.sparksql import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql把结果数据写入到mysql表中 case class Student(id:Int,name:String,age:Int) object Data2Mysql { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").master("local[2]").getOrCreate() //2、创建SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取文件数据 val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" ")) //4、rdd与样例类关联 val studentRDD: RDD[Student] = rdd1.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) //5、将rdd转换成dataFrame import spark.implicits._ val studentDF: DataFrame = studentRDD.toDF() //5、打印schema studentDF.printSchema() studentDF.show() studentDF.createTempView("student") //把年龄大于30的用户过滤出来 val result: DataFrame = spark.sql("select * from student where age >30") //把结果数据写入到mysql表中 //2.1 定义url val url="jdbc:mysql://node1:3306/spark" //2.2 定义表名 val table="student200" //2.3 定义mysql服务属性 val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //mode:表示数据插入的模式 //方法中可以传入4个参数 //overwrite:表示覆盖。如果表事先不存在,它会帮我们创建 //append : 表示追加,如果表事先不存在,它会帮我们创建 //ignore :表示忽略 ,如果表事先存在,它不会进行数据插入,直接忽略 //error :默认选项,表示如果表存在就报错 result.write.mode("ignore").jdbc(url,table,properties) //关闭 sc.stop() spark.stop() } }
-
2、打成jar包 集群运行
package demo.sparksql import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql把结果数据写入到mysql表中 case class Student(id:Int,name:String,age:Int) object Data2Mysql { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate() //2、创建SparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取文件数据 val rdd1: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" ")) //4、rdd与样例类关联 val studentRDD: RDD[Student] = rdd1.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) //5、将rdd转换成dataFrame import spark.implicits._ val studentDF: DataFrame = studentRDD.toDF() //5、打印schema studentDF.printSchema() studentDF.show() studentDF.createTempView("student") //把年龄大于30的用户过滤出来 val result: DataFrame = spark.sql("select * from student where age >30") //把结果数据写入到mysql表中 //2.1 定义url val url="jdbc:mysql://node1:3306/spark" //2.2 定义表名 val table=args(1) //2.3 定义mysql服务属性 val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //mode:表示数据插入的模式 //方法中可以传入4个参数 //overwrite:表示覆盖。如果表事先不存在,它会帮我们创建 //append : 表示追加,如果表事先不存在,它会帮我们创建 //ignore :表示忽略 ,如果表事先存在,它不会进行数据插入,直接忽略 //error :默认选项,表示如果表存在就报错 result.write.mode("append").jdbc(url,table,properties) //关闭 sc.stop() spark.stop() } }
-
3、集群提交运行
spark-submit --master spark://node1:7077 --class cn.itcast.sparksql.Data2Mysql --executor-memory 1g --total-executor-cores 4 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class12-1.0-SNAPSHOT.jar /person.txt token
请关注公众号: