spark sql——6. spark sql操作hbase

目标:

在hbase建一张表,使用spark sql操作它

 

参考:

https://blog.csdn.net/eyeofeagle/article/details/84571756

https://blog.csdn.net/eyeofeagle/article/details/89943913

 

hbase数据准备:

进入hbase shell:

[[email protected] ~]# hbase shell

创建表p1,列族f:

hbase(main):010:0> create 'p1','f'

插入数据:

put '表', '行', '列族:列', '值'

hbase(main):009:0> put 'p1','r1','f:name','zhangsan'

hbase(main):010:0> put 'p1','r2','f:age','25'

hbase(main):011:0> put 'p1','r2','f:name','lisi'

hbase(main):012:0> put 'p1','r3','f:age','34'

hbase(main):013:0> put 'p1','r3','f:name','wangwu'

hbase(main):014:0> put 'p1','r4','f:age','25'

hbase(main):015:0> put 'p1','r4','f:name','zhaoliu'

查看p1表:

hbase(main):016:0> scan 'p1'

ROW COLUMN+CELL

r1 column=f:age, timestamp=1559221341908, value=23

r1 column=f:name, timestamp=1559221371077, value=zhangsan

r2 column=f:age, timestamp=1559221388518, value=25

r2 column=f:name, timestamp=1559221405222, value=lisi

r3 column=f:age, timestamp=1559221422532, value=34

r3 column=f:name, timestamp=1559221443076, value=wangwu

r4 column=f:age, timestamp=1559221466632, value=25

r4 column=f:name, timestamp=1559221475429, value=zhaoliu

4 row(s) in 0.0180 seconds

 

启动hadoop和hbase:

start-all.sh

start-hbase.sh

 

jar包:

需要把hbase的相关jar包添加到Idea项目,hbase的jar包在/hbase/lib/

spark sql——6. spark sql操作hbase

 

 

idea代码:

import org.apache.spark.sql.SparkSession

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.util.Bytes

 

object hbase_test {

def main(args: Array[String]): Unit = {

// 创建SparkSession

val spark = SparkSession.builder()

.appName("hbase")

.master("local")

.getOrCreate()

 

// 连接到hbase表p1,读取为RDD

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set(TableInputFormat.INPUT_TABLE,"p1")

val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConf,

classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result]

)

 

// RDD的类型转换

val tupRDD = hbaseRDD.map({

case (_, result) => {

val name = Bytes.toString(result.getValue(Bytes.toBytes("f"),Bytes.toBytes("name")))

val age = Bytes.toString(result.getValue(Bytes.toBytes("f"),Bytes.toBytes("age")))

// 转换成RDD[Row]

(name,age)

}

})

 

// rdd转dataframe,注册临时表,查询

import spark.implicits._

val df = tupRDD.toDF("name","age")

df.createOrReplaceTempView("person")

val df2 = spark.sql("select * from person")

df2.show()

}

}

spark sql——6. spark sql操作hbase