PySpark基于SHC框架读取MySQL数据转成DataFrame写入HBase
一、准备工作请参考本人之前的文章
二、代码
#/usr/bin/python
#-*- coding:utf-8 –*-
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType
from pyspark.sql.dataframe import DataFrame
sc = SparkContext(appName="pyspark_hbase")
sql_sc = SQLContext(sc)
jdbc_url_test = 'jdbc:mysql://192.168.****/xxx?' \
'user=*******&' \
'password=*******&' \
'useUnicode=true&' \
'characterEncoding=utf8&' \
'TreatTinyAsBoolean=false&' \
'tinyInt1isBit=false'
jdbc_driver = 'com.mysql.jdbc.Driver'
df_test_HBase = sql_sc.read.format('jdbc').options(url=jdbc_url_test,driver=jdbc_driver,dbtable='testHBase').load()
df_test_HBase.createOrReplaceTempView("test_HBase")
#shc对df中数值型的编码有些坑,直接写HBase会出现数值类型字段乱码,本人暂时没填坑,只是绕开坑走了,先在df中吧数值型转成String再写HBase
df_cast_HBase = sql_sc.sql("select CAST(id as String) id,name,CAST(age as String) age,CAST(gender as String) gender,cat,tag,level from test_HBase")
df_cast_HBase.show()
dep = "org.apache.spark.sql.execution.datasources.hbase"
#创建schema
catalog = """{
"table":{"namespace":"default", "name":"teacher", "tableCoder":"PrimitiveType"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"int"},
"name":{"cf":"teacherBase", "col":"name", "type":"string"},
"age":{"cf":"teacherBase", "col":"age", "type":"int"},
"gender":{"cf":"teacherBase", "col":"gender","type":"int"},
"cat":{"cf":"teacherDetails", "col":"cat","type":"string"},
"tag":{"cf":"teacherDetails", "col":"tag", "type":"string"},
"level":{"cf":"teacherDetails", "col":"level","type":"string"} }
} """
#newTable:如果是只是将数据写到HBase中存在的表就不用设置,建新表才需要而且值要大于3
df_cast_HBase.write.options(catalog=catalog,newTable="5").format(dep).save()
print ('***************************************************************')
print ('******************* 写 入 成 功 ******************')
print ('***************************************************************')
sc.stop()
三、结果
四、问题
就像我代码中注释说的一样,目前本人还没找到填坑的方法,只能绕路走,如果有填过该坑的朋友欢迎留言,附问题截图
上边是直接写HBase的结果,下面是绕坑走后写入到HBase中的数据