PySpark基于SHC框架读取MySQL数据转成DataFrame写入HBase

一、准备工作请参考本人之前的文章

二、代码

#/usr/bin/python
#-*- coding:utf-8 –*-

from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType
from pyspark.sql.dataframe import DataFrame

sc = SparkContext(appName="pyspark_hbase")
sql_sc = SQLContext(sc)
jdbc_url_test = 'jdbc:mysql://192.168.****/xxx?' \
                'user=*******&' \
                'password=*******&' \
                'useUnicode=true&' \
                'characterEncoding=utf8&' \
                'TreatTinyAsBoolean=false&' \
                'tinyInt1isBit=false'
jdbc_driver = 'com.mysql.jdbc.Driver'
df_test_HBase = sql_sc.read.format('jdbc').options(url=jdbc_url_test,driver=jdbc_driver,dbtable='testHBase').load()
df_test_HBase.createOrReplaceTempView("test_HBase")
#shc对df中数值型的编码有些坑,直接写HBase会出现数值类型字段乱码,本人暂时没填坑,只是绕开坑走了,先在df中吧数值型转成String再写HBase
df_cast_HBase = sql_sc.sql("select CAST(id as String) id,name,CAST(age as String) age,CAST(gender as String) gender,cat,tag,level from test_HBase")
df_cast_HBase.show()
dep = "org.apache.spark.sql.execution.datasources.hbase"
#创建schema
catalog = """{
              "table":{"namespace":"default", "name":"teacher", "tableCoder":"PrimitiveType"},
              "rowkey":"key",
              "columns":{
                       "id":{"cf":"rowkey", "col":"key", "type":"int"},
                       "name":{"cf":"teacherBase", "col":"name", "type":"string"},
                       "age":{"cf":"teacherBase", "col":"age", "type":"int"},
                       "gender":{"cf":"teacherBase", "col":"gender","type":"int"},
                       "cat":{"cf":"teacherDetails", "col":"cat","type":"string"},
                       "tag":{"cf":"teacherDetails", "col":"tag", "type":"string"},
                       "level":{"cf":"teacherDetails", "col":"level","type":"string"}  }
            } """
#newTable:如果是只是将数据写到HBase中存在的表就不用设置,建新表才需要而且值要大于3
df_cast_HBase.write.options(catalog=catalog,newTable="5").format(dep).save()
print ('***************************************************************')
print ('*******************        写 入 成 功       ******************')
print ('***************************************************************')
sc.stop()

三、结果

PySpark基于SHC框架读取MySQL数据转成DataFrame写入HBase

四、问题

 就像我代码中注释说的一样,目前本人还没找到填坑的方法,只能绕路走,如果有填过该坑的朋友欢迎留言,附问题截图

PySpark基于SHC框架读取MySQL数据转成DataFrame写入HBase

上边是直接写HBase的结果,下面是绕坑走后写入到HBase中的数据