HBaseRDD -- 封装Spark on HBase操作到Spark RDD
最近阅读Spark的源代码,发现Spark使用隐式转换,对rdd进行扩展,提供额外的功能,如PairRDDFunctions,对RDD进行扩展,提供诸如orderByKey等方法,前段时间我们使用Spark操作HBase,由于急着上线,未能对功能进行较好的封装,现在回过头去看,发现其实可以模仿PairRDDFunctions的实现,做一个HBaseRDDFuctions,通过隐式转换,实现对HBase基本操作的封装。
HBaseRDDFuctions的代码实现如下:
import java.io.IOException
import java.util
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.KeyValue.Type
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.language.implicitConversions
import scala.collection.mutable.ListBuffer
/**
* Created by chenjian2.zh on 2018/4/26.
*/
class HBaseRDDFunctions[T](self: RDD[T]) {
private def saveToHBase(sc: SparkContext,
tableName: String,
hbaseRDD: RDD[(ImmutableBytesWritable, KeyValue)],
tmpHFilePath: String
): Unit = {
var conn: Connection = null
var realTable: Table = null
val hbaseConf = HBaseConfiguration.create(sc.hadoopConfiguration)
val hbTableName = TableName.valueOf(tableName)
val job = Job.getInstance(hbaseConf)
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.deleteOnExit(new Path(tmpHFilePath))
try {
conn = ConnectionFactory.createConnection(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
realTable = conn.getTable(hbTableName)
HFileOutputFormat2.configureIncrementalLoad(job, realTable, conn.getRegionLocator(hbTableName))
hbaseRDD.saveAsNewAPIHadoopFile(tmpHFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
val loader = new LoadIncrementalHFiles(hbaseConf)
loader.doBulkLoad(new Path(tmpHFilePath), realTable.asInstanceOf[HTable])
} finally {
fs.deleteOnExit(new Path(tmpHFilePath))
if (realTable != null) {
realTable.close()
}
if (conn != null) {
conn.close()
}
}
}
def hbaseBulkDeleteColumn(sc: SparkContext,
tableName: String,
tmpHFilePath: String,
transfer: T => Array[Byte]
): Unit = {
implicit val rowKeyOrding = new Ordering[Array[Byte]] {
override def compare(left: Array[Byte], right: Array[Byte]) = {
Bytes.compareTo(left, right)
}
}
val ts = System.currentTimeMillis()
val hbaseRdd = self.map(rd => {
(transfer(rd), Type.DeleteColumn.getCode)
}).sortByKey().map(rd => {
(new ImmutableBytesWritable(rd._1), new KeyValue(rd._1, ts, Type.DeleteColumn))
})
saveToHBase(sc, tableName, hbaseRdd, tmpHFilePath)
}
def hbaseBulkLoad(sc: SparkContext,
tableName: String,
tmpHFilePath: String,
transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])],
kvType: Type = Type.Put
): Unit = {
implicit val rowKeyOrding = new Ordering[Array[Byte]] {
override def compare(left: Array[Byte], right: Array[Byte]) = {
Bytes.compareTo(left, right)
}
}
val ts = System.currentTimeMillis()
val hbaseRDD = self.flatMap(transfer).map(rd => {
val rowKey = rd._1
val columnFamily = rd._2
val qualifier = rd._3
val value = rd._4
val key = new Array[Byte](rowKey.length + columnFamily.length + qualifier.length)
System.arraycopy(rowKey, 0, key, 0, rowKey.length)
System.arraycopy(columnFamily, 0, key, rowKey.length, columnFamily.length)
System.arraycopy(qualifier, 0, key, rowKey.length + columnFamily.length, qualifier.length)
(key, (rowKey.length, columnFamily.length, value))
}).sortByKey().map(rd => {
val rowKey = new Array[Byte](rd._2._1)
val columnFamily = new Array[Byte](rd._2._2)
val qualifier = new Array[Byte](rd._1.length - rd._2._1 - rd._2._2)
System.arraycopy(rd._1, 0, rowKey, 0, rd._2._1)
System.arraycopy(rd._1, rd._2._1, columnFamily, 0, rd._2._2)
System.arraycopy(rd._1, rd._2._1 + rd._2._2, qualifier, 0, rd._1.length - rd._2._1 - rd._2._2)
val kv = new KeyValue(rowKey, columnFamily, qualifier, ts, kvType, rd._2._3)
(new ImmutableBytesWritable(rowKey), kv)
})
saveToHBase(sc, tableName, hbaseRDD, tmpHFilePath)
}
def hbaseBulkDelete(sc: SparkContext,
tableName: String,
tmpHFilePath: String,
transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])]
): Unit = {
hbaseBulkLoad(sc, tableName, tmpHFilePath, transfer, Type.DeleteColumn)
}
def hbaseGet(sc: SparkContext,
tableName: String,
transfer: T => Get,
zkQuorum: String,
zkPort: String,
num: Int = 100
): RDD[Result] = {
self.map(transfer).mapPartitions(it => {
val lb = new ListBuffer[Result]()
val batch = new util.ArrayList[Row]
var realTable: Table = null
val hbTableName = TableName.valueOf(tableName)
var conn: Connection = null
val hbConf = HBaseConfiguration.create()
hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
hbConf.set("hbase.zookeeper.quorum", zkQuorum)
try {
conn = ConnectionFactory.createConnection(hbConf)
realTable = conn.getTable(hbTableName)
while (it.hasNext) {
val get = it.next()
get.setCacheBlocks(false)
batch.add(get)
if (batch.size >= num) {
val results = new Array[Object](batch.size)
realTable.batch(batch, results)
results.foreach(x => {
lb += x.asInstanceOf[Result]
})
}
}
if (batch.size() != 0) {
val results = new Array[Object](batch.size)
realTable.batch(batch, results)
results.foreach(x => {
lb += x.asInstanceOf[Result]
})
}
} finally {
if (realTable != null) {
try {
//关闭HTable对象
realTable.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (conn != null) {
try {
// 关闭HBase连接
conn.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
}
lb.iterator
})
}
def hbaseDelete(sc: SparkContext,
tableName: String,
transfer: T => Delete,
zkQuorum: String,
zkPort: String,
num: Int = 100
): Unit = {
hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)
}
def hbaseMutation(sc: SparkContext,
tableName: String,
transfer: T => Mutation,
zkQuorum: String,
zkPort: String,
num: Int = 100
): Unit = {
self.foreachPartition(it => {
var conn: Connection = null
var realTable: Table = null
val hbConf = HBaseConfiguration.create()
hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
hbConf.set("hbase.zookeeper.quorum", zkQuorum)
val hbTableName = TableName.valueOf(tableName)
try {
conn = ConnectionFactory.createConnection(hbConf)
realTable = conn.getTable(hbTableName)
val mutationList = new java.util.ArrayList[Mutation]
while (it.hasNext) {
mutationList.add(transfer(it.next()))
if (mutationList.size >= num) {
realTable.batch(mutationList, null)
mutationList.clear()
}
}
if (mutationList.size > 0) {
realTable.batch(mutationList, null)
mutationList.clear()
}
} finally {
if (realTable != null) {
try {
//关闭HTable对象
realTable.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (conn != null) {
try {
// 关闭HBase连接
conn.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
}
})
}
def hbaseScan(sc: SparkContext,
tableName: String,
transfer: T => Scan,
zkQuorum: String,
zkPort: String
): RDD[Result] = {
self.mapPartitions(it => {
val lb = new ListBuffer[Result]()
var conn: Connection = null
var realTable: Table = null
val hbConf = HBaseConfiguration.create()
hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
hbConf.set("hbase.zookeeper.quorum", zkQuorum)
val hbTableName = TableName.valueOf(tableName)
try {
conn = ConnectionFactory.createConnection(hbConf)
realTable = conn.getTable(hbTableName)
var scanner: ResultScanner = null
try {
while (it.hasNext) {
val scan = transfer(it.next())
scan.setCacheBlocks(false)
scanner = realTable.getScanner(scan)
val scannerIt = scanner.iterator
while (scannerIt.hasNext) {
val rs = scannerIt.next()
lb += rs
}
}
} finally {
if (scanner != null) {
scanner.close()
}
}
} finally {
if (realTable != null) {
try {
//关闭HTable对象
realTable.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (conn != null) {
try {
// 关闭HBase连接
conn.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
}
lb.iterator
})
}
def hbasePut(sc: SparkContext,
tableName: String,
transfer: T => Put,
zkQuorum: String,
zkPort: String,
num: Int = 100
): Unit = {
hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)
}
def hbaseIncrement(sc: SparkContext,
tableName: String,
transfer: T => Increment,
zkQuorum: String,
zkPort: String,
num: Int = 100
): Unit = {
hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)
}
}
object HBaseRDDFunctions {
def fromScan(sc: SparkContext,
tableName: String,
scan: Scan
): RDD[(ImmutableBytesWritable, Result)] = {
scan.setCacheBlocks(false)
val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray)
val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)
hbConf.set(TableInputFormat.INPUT_TABLE, tableName)
hbConf.set(TableInputFormat.SCAN, scanToString)
sc.newAPIHadoopRDD(hbConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
}
implicit def rddToHBaseRDD[T](rdd: RDD[T]): HBaseRDDFunctions[T] = {
new HBaseRDDFunctions(rdd)
}
}
注:HBase的版本为1.0.2, Spark的版本为2.1.0
使用方法:
1、导入隐式转换函数:
2、导入后,spark rdd将得到扩展,出现我们自己开发的Spark on HBase的方法: