Kafka Consumer多线程消费实战和源码分析
根据官方文档我们知道KafkaConsumer是线程不安全的,KafkaProducer是线程安全的。接下来我们就来讨论为什么KafkaConsumer线程不安全
1、kafka的消费者和分区的关系:
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,
即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。
如图,当consumer1进行一次commit操作时,会同时提交分区0、1的offset信息;
当consumer2 进行一次commit操作时,只会提交分区2的offset信息;
2、多线程消费kafka数据
根据消费者和分区关系,在结合多线程具体使用场景有:多线程处理一个消费者只消费一个分区;多线程处理一个消费者消费多个分区。对应模型图为:
2.1 分析场景2
线程1获得消费者,消费者进行提交操作,然后将分区2的offset+1,完成了本次的提交操作,这里实现了分区和线程关系的绑定,是线程安全的。
代码实现:
2.2 分析场景1
当线程1获取分区1的数据数据,分区2没有数据时就是场景2的场景不会有任何问题。但是当线程1获取分区1的数据数据,为提交时;线程2获取分区2的数据数据开始消费,这时分区1的offset并没有加1,这里就会有问题了。已做图方式进行解释:
可以看到,当分区数据都被消费后,分区信息的offset应该为(1,1),但实际的分区offset为(1,0)或者(0,1)这个就出现线程不安全问题。
代码实现:
2.2.1 源码分析
Kafka 的consumer有进行一定检查机制,先对进行部分源码分析:
从consumer的subscribe进行分析:
可见首先进行调用acquireAndEnsureOpen(),
根据调用链路可见,当线程1进入,首先获取线程id,然后进行线程id对比。由于初始化线程id为NO_CURRENT_THREAD = -1L;
进行操作1判断,线程id肯定不等于-1。进行操作2,通过一个cas操作,将currenThread的值更新成线程1id;
当线程2进行,首先获取线程id,然后进行线程id对比。进行操作1判断,线程2id肯定不等于线程1id,
进行操作2,通过一个cas操作,发现将currenThread的值为线程1id并不是NO_CURRENT_THREAD 所有抛出异常。
注:根据源码分析,consumer的subscribe和commitSync需要在同一个线程中进行操作,否则会线程异常(之前subscribe我写在主线程中,然后在子线程中commitSync。。。蠢哭了)
2.2.2 解决场景1问题
解决的基本思路:就是在当多线程提交时,每个线程一次只提交自己消费分区的offset,其他分区信息不进行提交。
本例中包含3个类:
- ConsumerThreadHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
- ConsumerWorker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
- Main类:测试主方法类
-
大神问题解决源码的参考文档:https://www.bbsmax.com/A/l1dyPVY65e/