使用pyspark执行hive sql

配置环境

环境配置这里就不再多讲,只研究执行效率的对比

  1. spark
  2. hadoop

执行模式

假设一个查询host出数量的sql是这样:

select host,count(distinct c.mobile) as mobile_num from xml.my_goods d 
right join ( select b.xmsec as mobile from( 
    select mobile_id from xll.xf_shenzhen where dt = '2018-08-31') a  
    left join zww.nami b on a.mobile_id = b.mobile_id  
    where b.money is not null ) c on upper(d.mobile) = upper(c.mobile) 
where dt >= '20180827' and c.mobile is not null 
group by host

hive模式

直接把上面的sql放到hue的hive工作台中执行即可

pyspark模式

使用pyspark执行hive sql
共三个文件:

  1. run.sh:执行文件,内容是一个执行py脚本的命令
spark2-submit --master local[*] spark_test.py
  1. spark_test.py:pyspark脚本,作用是执行sql,并把结果保存到hive上
import datetime 
import sql_conf as xml 
from pyspark import SparkConf,SparkContext 
from pyspark.sql import HiveContext 
 
filename='/user/liuxunming/spark_result_'+datetime.datetime.now().strftime('%Y%m%d%H%M%S') 
 
conf = SparkConf() 
conf.setAppName('url rate') 
sc = SparkContext(conf=conf) 
 
hc = HiveContext(sc) 
url_table = hc.sql(xml.first_sql).repartition(1).write.csv(filename) 
 
sc.stop() 
  1. sql_conf.py:sql配置文件,里面可以配置n条sql语句,要注意的是每条sql前后必须用三个单引号,这是规定的格式。
first_sql='''select host,count(distinct c.mobile) as mobile_num from xml.my_goods d 
right join ( select b.xmsec as mobile from( 
    select mobile_id from xll.xf_shenzhen where dt = '2018-08-31') a  
    left join zww.nami b on a.mobile_id = b.mobile_id  
    where b.money is not null ) c on upper(d.mobile) = upper(c.mobile) 
where dt >= '20180827' and c.mobile is not null 
group by host''' 
secend_sql='''select name from liuxunming.catch_product'''

最后直接执行run.sh脚本即可

./run.sh

最终结果

hive模式

使用pyspark执行hive sql
直接在hue平台hive界面上输入sql,执行耗时20min22s

pyspark模式

使用pyspark执行hive sql
使用pyspark执行hive sql
可以看到执行耗时为4min,是普通的hive模式耗时的五分之一

错误解决

错误一:字节编码

SyntaxError: Non-ASCII character '\xe8' in file /home/liuxunming/url_rate/sql_conf.py on line 14, but no encoding declared; see http://www.python.org/peps/pep-0263.html for details

在sql_conf.py第一行加上配置

另外sql传参中有中文的时候需要添加一个编码的头

# -*- coding: utf-8 -*-

错误二:传递参数

string.format
关键字模式

‘select * from run where dt <= {today} and dt >{yesterday}’.format(today='20180901',yesterday='20180831')

上面这种方式可能有问题,使用下面这种位置模式

‘select * from run where dt <= {0} and dt >{1}’.format('20180901','20180831')

事实证明这样传参也不行
最后的解决办法就是在sql传参的花括号前后添加单引号

'{}'

后续

传参以后,可以把参数设置成每天的日期,然后把结果直接保存到hive表中,然后再通过sqoop把hive数据导入到mysql中,最后再调用linux的crontab定时器,就能实现一个自动化

参考文章

  1. 使用PySpark编写SparkSQL程序查询Hive数据仓库
  2. SyntaxError: Non-ASCII character ‘\xe6’ in file
  3. mysql函数之截取字符串
  4. Python中单引号,双引号,3个单引号及3个双引号的区别
  5. python之字符串格式化(format)