Spark SQL 实战
http://www.sogou.com/labs/resource/q.php 测试数据下载
下载用户查询日志的语料库(下载该包SogouQ.reduced.zip)
创建存放用户查询日志内容的数据库
hive (default)> create database hive_sogou;
hive (default)> show databases;
OK
database_name
db_hive
db_hive2
default
hive_sogou
testdb
hive (default)> use hive_sogou;
OK
Time taken: 0.037 seconds
创建表
create table SogouQ1(id string,websesion string,word string,s_seq int,c_seq int,website string)
row format delimited fields terminated by '\t' lines terminated by '\n';
hive (hive_sogou)> show tables;
OK
tab_name
sogouq1
导入数据
load data local inpath '/home/hadoop/tools/hivefile/SogouQ.reduced' into table sogouq1;
Loading data to table hive_sogou.sogouq1
OK
Time taken: 4.338 seconds
查询导入的数据条数
hive (hive_sogou)> select count(*) from sogouq1;
Starting Job = job_1586223718291_0011, Tracking URL = http://centoshadoop1:8088/proxy/application_1586223718291_0011/
Kill Command = /home/hadoop/hadoop-ha/hadoop/hadoop-2.8.5/bin/hadoop job -kill job_1586223718291_0011
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2020-04-09 15:36:32,833 Stage-1 map = 0%, reduce = 0%
2020-04-09 15:36:51,281 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 6.06 sec
2020-04-09 15:37:03,865 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 10.34 sec
MapReduce Total cumulative CPU time: 10 seconds 340 msec
Ended Job = job_1586223718291_0011
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 10.34 sec HDFS Read: 152018432 HDFS Write: 107 SUCCESS
Total MapReduce CPU Time Spent: 10 seconds 340 msec
OK
_c0
1724264
Time taken: 96.121 seconds, Fetched: 1 row(s)
启动Spark集群
bin/spark-shell --master spark://centoshadoop1:7077
通过Spark-shell 来使用Spark SQL
场景一:通过Spark SQL读取HDFS中的people.json文件,并查看操作这个数据文件。具体运行情况如下:
### 创建sqlContext上下文对象
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext:
org.apache.spark.sql.SQLContext = [email protected]
### 读取json中的数据HDFS中的people.json文件,并查看操作这个数据文件。具体运行情况如下:
scala> val df = sqlContext.read.json("/library/SparkSQL/Data/people.json");
df: org.apache.spark.sql.DataFrame = [age: bigint, depId: bigint ... 4 more fields]
注意:/library/SparkSQL/Data/ 路径必须为集群的物理路径 必须是通过hadoop命令创建的目录路径
### 查看dataFrame的内容
scala> df.show
+---+-----+------+----------+-------+------+
|age|depId|gender|job number| name|salary|
+---+-----+------+----------+-------+------+
| 33| 1| male| 001|Mechael| 3000|
| 30| 2|female| 002| Andy| 4000|
| 19| 3| male| 003| Justin| 5000|
| 32| 1| male| 004| John| 6000|
| 20| 2|female| 005| Herry| 7000|
| 26| 3| male| 006| Jack| 3000|
+---+-----+------+----------+-------+------+
### 查看dataFrame的树形结构
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- depId: long (nullable = true)
|-- gender: string (nullable = true)
|-- job number: string (nullable = true)
|-- name: string (nullable = true)
|-- salary: long (nullable = true)
### 只查看name这一列所有数据,并且显示出来
scala> df.select("name").show
+-------+
| name|
+-------+
|Mechael|
| Andy|
| Justin|
| John|
| Herry|
| Jack|
+-------+
### 查看name,和age+1的结果,并且show出来
scala> df.select(df("name"),df("age")+1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Mechael| 34|
| Andy| 31|
| Justin| 20|
| John| 33|
| Herry| 21|
| Jack| 27|
+-------+---------+
### 选出年龄大于21岁的人,并且显示出来
scala> df.filter(df("age")>21).show
+---+-----+------+----------+-------+------+
|age|depId|gender|job number| name|salary|
+---+-----+------+----------+-------+------+
| 33| 1| male| 001|Mechael| 3000|
| 30| 2|female| 002| Andy| 4000|
| 32| 1| male| 004| John| 6000|
| 26| 3| male| 006| Jack| 3000|
+---+-----+------+----------+-------+------+
启动Spark SQL
bin/spark-sql --master spark://centoshadoop1:7077
Spark SQL的CLI(命令终端)是一个方便的工具,以本地方式运行在Hive的元数据服务上,可以直接在命令行中输入查询语句进行查询。不过需要注意的是,SparkSQL的CLI不能操作Thrift JDBC Server。下面介绍如何使用CLI。
show databases; ---- 显示库
show tables; ---- 显示表
desc tbdate ---- 查看tbdate表的结构
select count(*) from tbdata limit 10; ---- 查看表的前10行数据
简介DataFrame编程模型
DataFrame类似于关系数据库的表,这样组件的引入,简化了编程复杂度,极大地方便了Spark SQL的应用。Spark引入DataFrame实现了在大数据平台同样的统计分析功能。随着不断地优化,DataFrame可以有更广阔的数据来源创建,例如:结构化的数据文件,Hive表,外部数据库或者现有的RDD。
下图是RDD与DataFrame进行对比
(1)RDD的每一项数据都是一个整体,这也就导致了Spark框架的无法洞悉数据记录内部的细节,限制了Spark SQL的性能提升。
(2)DataFrame的数据特点如上图所示,其中包含了每个数据记录的Metadata信息,可以在优化时基于列内部进行优化(例如一共30列,如果只需要其中10列,那么就可以只获取其中10列的信息,而不需要把所有30列数据全部取出)。
DataFrame更像是RDD的加强版,带有更多的细节信息,与普通的RDD不同的是DataFrame是有Schema(标记)的,也就是说,DataFrame是带有每一列信息的。可以把DataFrame理解为一个分布式的二维表,每一列都带有名称和类型,这意味着Spark SQL可以基于每一列数据的元数据进行更加细粒度的分析,而不是如同以往分析RDD的时候那中粗粒度的分析。于是基于DataFrame就可以更加高效的性能优化.