spark-streaming消费kafka写入hbase踩坑实战

场景

otter同步mysql的数据到kafka,spark-streaming消费kafka,采用motator异步写hbase。mysql的insert,update,delete特别频繁,每秒都有大量的insert,delete,update

 

以下都是基于同一条数据操作

1.  坑 Delete(byte [] row), Put(byte[] row),

bufferedmutator写hbase的时候 同一条数据先 delete 再put ,

遇到的问题:

hbase显示没这个数据;就是bufferedmutator处理数据乱序了(每次测试都是hbase找不到数据)

解决的途径:

遇坑后,百度找到https://developer.aliyun.com/ask/129312?spm=a2c6h.13159736这篇文章。意思就是new 对象 加上时间戳(增加ms级别极端情况判断,对后者加1ms,见case5)

2.坑 Delete(byte [] row, long timestamp) ,Put(byte[] row, long ts) 

代码里时间戳使用System.currentTimeMillis()

遇到的问题:

spark-streaming处理数据的时候,一个批次rdd遍历起来很快,难免两个System.currentTimeMillis()一样,此时就和方法1一样,达不到先delete,后insert的效果,依然没有解决(反复测试,确实有两个时间戳一样)

解决的途径:

然后又找到更细粒度的纳秒,System.nanoTime()作为参数传递

 

 

3.坑 Delete(byte [] row, long timestamp) ,Put(byte[] row, long ts) 

代码里使用System.nanoTime() 作为参数传到delete和put对象里,写hbase是没问题的,此时看似解决问题了,但是又有其他新问题出现,写入hbase的timestamp的值就是System.nanoTime() 的返回值

以前对这个System.nanoTime()不了解,百度找了资料了解了点https://www.cnblogs.com/andy-songwei/p/10784049.html

遇到的问题:

1:HTable设置的ttl是根据System.currentTimeMillis()判断的,达不到想要的效果

2:System.nanoTime()这个返回值是 返回正在运行的Java虚拟机的高分辨率时间源的当前值,以纳秒计。还是随机的

3:System.nanoTime()不能与java.util.Date转换

4:服务器重启,System.nanoTime()的返回值与以前的返回值没有关系,可能比之前的值更小,那么hbase 的数据就不能操作了

5:还了解了hbase的时间戳 https://blog.csdn.net/inte_sleeper/article/details/11689059

每次操作的时间戳必须比以前旧数据的时间戳大,否则数据不会变化

4.坑 hbase-issues里找到1所说的bug

https://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455

问题描述:

spark-streaming消费kafka写入hbase踩坑实战

解决方法:

spark-streaming消费kafka写入hbase踩坑实战

 

5.代码参照4方法优化

val keyTimesMap = new mutable.HashMap[String,Long]()

可变集合 保存 key和key对应的时间戳

在event_type是delete 的时候调用下边方法,在event_class不是delete的时候也调用这个方法

spark-streaming消费kafka写入hbase踩坑实战