HBaseRDD -- 封装Spark on HBase操作到Spark RDD

最近阅读Spark的源代码,发现Spark使用隐式转换,对rdd进行扩展,提供额外的功能,如PairRDDFunctions,对RDD进行扩展,提供诸如orderByKey等方法,前段时间我们使用Spark操作HBase,由于急着上线,未能对功能进行较好的封装,现在回过头去看,发现其实可以模仿PairRDDFunctions的实现,做一个HBaseRDDFuctions,通过隐式转换,实现对HBase基本操作的封装。

 

HBaseRDDFuctions的代码实现如下:

import java.io.IOException

import java.util

 

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.KeyValue.Type

import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableInputFormat}

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import scala.language.implicitConversions

import scala.collection.mutable.ListBuffer

 

/**

  * Created by chenjian2.zh on 2018/4/26.

  */

 

class HBaseRDDFunctions[T](self: RDD[T]) {

 

  private def saveToHBase(sc: SparkContext,

                          tableName: String,

                          hbaseRDD: RDD[(ImmutableBytesWritable, KeyValue)],

                          tmpHFilePath: String

                         ): Unit = {

 

    var conn: Connection = null

    var realTable: Table = null

    val hbaseConf = HBaseConfiguration.create(sc.hadoopConfiguration)

    val hbTableName = TableName.valueOf(tableName)

    val job = Job.getInstance(hbaseConf)

    val fs = FileSystem.get(sc.hadoopConfiguration)

    fs.deleteOnExit(new Path(tmpHFilePath))

 

    try {

      conn = ConnectionFactory.createConnection(hbaseConf)

      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

      job.setMapOutputValueClass(classOf[KeyValue])

      realTable = conn.getTable(hbTableName)

      HFileOutputFormat2.configureIncrementalLoad(job, realTable, conn.getRegionLocator(hbTableName))

      hbaseRDD.saveAsNewAPIHadoopFile(tmpHFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)

      val loader = new LoadIncrementalHFiles(hbaseConf)

      loader.doBulkLoad(new Path(tmpHFilePath), realTable.asInstanceOf[HTable])

 

    } finally {

      fs.deleteOnExit(new Path(tmpHFilePath))

      if (realTable != null) {

        realTable.close()

      }

      if (conn != null) {

        conn.close()

      }

    }

  }

 

  def hbaseBulkDeleteColumn(sc: SparkContext,

                            tableName: String,

                            tmpHFilePath: String,

                            transfer: T => Array[Byte]

                           ): Unit = {

 

    implicit val rowKeyOrding = new Ordering[Array[Byte]] {

      override def compare(left: Array[Byte], right: Array[Byte]) = {

Bytes.compareTo(left, right)

      }

    }

 

    val ts = System.currentTimeMillis()

    val hbaseRdd = self.map(rd => {

      (transfer(rd), Type.DeleteColumn.getCode)

    }).sortByKey().map(rd => {

      (new ImmutableBytesWritable(rd._1), new KeyValue(rd._1, ts, Type.DeleteColumn))

    })

    saveToHBase(sc, tableName, hbaseRdd, tmpHFilePath)

  }

 

  def hbaseBulkLoad(sc: SparkContext,

                    tableName: String,

                    tmpHFilePath: String,

                    transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])],

                    kvType: Type = Type.Put

                   ): Unit = {

    implicit val rowKeyOrding = new Ordering[Array[Byte]] {

      override def compare(left: Array[Byte], right: Array[Byte]) = {

Bytes.compareTo(left, right)

      }

    }

 

    val ts = System.currentTimeMillis()

 

    val hbaseRDD = self.flatMap(transfer).map(rd => {

      val rowKey = rd._1

      val columnFamily = rd._2

      val qualifier = rd._3

      val value = rd._4

      val key = new Array[Byte](rowKey.length + columnFamily.length + qualifier.length)

      System.arraycopy(rowKey, 0, key, 0, rowKey.length)

      System.arraycopy(columnFamily, 0, key, rowKey.length, columnFamily.length)

      System.arraycopy(qualifier, 0, key, rowKey.length + columnFamily.length, qualifier.length)

      (key, (rowKey.length, columnFamily.length, value))

    }).sortByKey().map(rd => {

      val rowKey = new Array[Byte](rd._2._1)

      val columnFamily = new Array[Byte](rd._2._2)

      val qualifier = new Array[Byte](rd._1.length - rd._2._1 - rd._2._2)

      System.arraycopy(rd._1, 0, rowKey, 0, rd._2._1)

      System.arraycopy(rd._1, rd._2._1, columnFamily, 0, rd._2._2)

      System.arraycopy(rd._1, rd._2._1 + rd._2._2, qualifier, 0, rd._1.length - rd._2._1 - rd._2._2)

      val kv = new KeyValue(rowKey, columnFamily, qualifier, ts, kvType, rd._2._3)

      (new ImmutableBytesWritable(rowKey), kv)

    })

    saveToHBase(sc, tableName, hbaseRDD, tmpHFilePath)

  }

 

  def hbaseBulkDelete(sc: SparkContext,

                      tableName: String,

                      tmpHFilePath: String,

                      transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])]

                     ): Unit = {

    hbaseBulkLoad(sc, tableName, tmpHFilePath, transfer, Type.DeleteColumn)

  }

 

  def hbaseGet(sc: SparkContext,

               tableName: String,

               transfer: T => Get,

               zkQuorum: String,

               zkPort: String,

               num: Int = 100

              ): RDD[Result] = {

    self.map(transfer).mapPartitions(it => {

      val lb = new ListBuffer[Result]()

      val batch = new util.ArrayList[Row]

      var realTable: Table = null

      val hbTableName = TableName.valueOf(tableName)

      var conn: Connection = null

      val hbConf = HBaseConfiguration.create()

      hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

      hbConf.set("hbase.zookeeper.quorum", zkQuorum)

 

      try {

        conn = ConnectionFactory.createConnection(hbConf)

        realTable = conn.getTable(hbTableName)

        while (it.hasNext) {

          val get = it.next()

          get.setCacheBlocks(false)

          batch.add(get)

 

          if (batch.size >= num) {

            val results = new Array[Object](batch.size)

            realTable.batch(batch, results)

            results.foreach(x => {

              lb += x.asInstanceOf[Result]

            })

          }

        }

 

        if (batch.size() != 0) {

          val results = new Array[Object](batch.size)

          realTable.batch(batch, results)

          results.foreach(x => {

            lb += x.asInstanceOf[Result]

          })

        }

      } finally {

        if (realTable != null) {

          try {

            //关闭HTable对象

            realTable.close()

          } catch {

            case e: IOException =>

              e.printStackTrace();

          }

        }

        if (conn != null) {

          try {

            // 关闭HBase连接

            conn.close()

          } catch {

            case e: IOException =>

              e.printStackTrace();

          }

        }

      }

      lb.iterator

    })

  }

 

  def hbaseDelete(sc: SparkContext,

                  tableName: String,

                  transfer: T => Delete,

                  zkQuorum: String,

                  zkPort: String,

                  num: Int = 100

                 ): Unit = {

    hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

  }

 

  def hbaseMutation(sc: SparkContext,

                    tableName: String,

                    transfer: T => Mutation,

                    zkQuorum: String,

                    zkPort: String,

                    num: Int = 100

                   ): Unit = {

    self.foreachPartition(it => {

      var conn: Connection = null

      var realTable: Table = null

      val hbConf = HBaseConfiguration.create()

      hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

      hbConf.set("hbase.zookeeper.quorum", zkQuorum)

      val hbTableName = TableName.valueOf(tableName)

      try {

        conn = ConnectionFactory.createConnection(hbConf)

        realTable = conn.getTable(hbTableName)

        val mutationList = new java.util.ArrayList[Mutation]

        while (it.hasNext) {

          mutationList.add(transfer(it.next()))

          if (mutationList.size >= num) {

            realTable.batch(mutationList, null)

            mutationList.clear()

          }

        }

        if (mutationList.size > 0) {

          realTable.batch(mutationList, null)

          mutationList.clear()

        }

      } finally {

        if (realTable != null) {

          try {

            //关闭HTable对象

            realTable.close()

          } catch {

            case e: IOException =>

              e.printStackTrace();

          }

        }

        if (conn != null) {

          try {

            // 关闭HBase连接

            conn.close()

          } catch {

            case e: IOException =>

              e.printStackTrace();

          }

        }

      }

    })

 

 

  }

 

  def hbaseScan(sc: SparkContext,

                tableName: String,

                transfer: T => Scan,

                zkQuorum: String,

                zkPort: String

               ): RDD[Result] = {

 

    self.mapPartitions(it => {

      val lb = new ListBuffer[Result]()

      var conn: Connection = null

      var realTable: Table = null

      val hbConf = HBaseConfiguration.create()

      hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

      hbConf.set("hbase.zookeeper.quorum", zkQuorum)

      val hbTableName = TableName.valueOf(tableName)

 

      try {

        conn = ConnectionFactory.createConnection(hbConf)

        realTable = conn.getTable(hbTableName)

        var scanner: ResultScanner = null

        try {

          while (it.hasNext) {

            val scan = transfer(it.next())

            scan.setCacheBlocks(false)

            scanner = realTable.getScanner(scan)

            val scannerIt = scanner.iterator

            while (scannerIt.hasNext) {

              val rs = scannerIt.next()

              lb += rs

            }

          }

        } finally {

          if (scanner != null) {

            scanner.close()

          }

        }

      } finally {

        if (realTable != null) {

          try {

            //关闭HTable对象

            realTable.close()

          } catch {

            case e: IOException =>

              e.printStackTrace();

          }

        }

        if (conn != null) {

          try {

            // 关闭HBase连接

            conn.close()

          } catch {

            case e: IOException =>

              e.printStackTrace();

          }

        }

      }

      lb.iterator

    })

  }

 

  def hbasePut(sc: SparkContext,

               tableName: String,

               transfer: T => Put,

               zkQuorum: String,

               zkPort: String,

               num: Int = 100

              ): Unit = {

    hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

  }

 

  def hbaseIncrement(sc: SparkContext,

                     tableName: String,

                     transfer: T => Increment,

                     zkQuorum: String,

                     zkPort: String,

                     num: Int = 100

                    ): Unit = {

    hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

  }

}

 

object HBaseRDDFunctions {

  def fromScan(sc: SparkContext,

               tableName: String,

               scan: Scan

              ): RDD[(ImmutableBytesWritable, Result)] = {

 

    scan.setCacheBlocks(false)

    val proto = ProtobufUtil.toScan(scan)

    val scanToString = Base64.encodeBytes(proto.toByteArray)

    val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)

    hbConf.set(TableInputFormat.INPUT_TABLE, tableName)

    hbConf.set(TableInputFormat.SCAN, scanToString)

    sc.newAPIHadoopRDD(hbConf,

      classOf[TableInputFormat],

      classOf[ImmutableBytesWritable],

      classOf[Result]

    )

  }

 

  implicit def rddToHBaseRDD[T](rdd: RDD[T]): HBaseRDDFunctions[T] = {

    new HBaseRDDFunctions(rdd)

  }

}

注:HBase的版本为1.0.2, Spark的版本为2.1.0

 

 

使用方法:

1、导入隐式转换函数:

 

2、导入后,spark rdd将得到扩展,出现我们自己开发的Spark on HBase的方法:

HBaseRDD -- 封装Spark on HBase操作到Spark RDD