Kafka Consumer多线程消费实战和源码分析

根据官方文档我们知道KafkaConsumer是线程不安全的,KafkaProducer是线程安全的。接下来我们就来讨论为什么KafkaConsumer线程不安全

1、kafka的消费者和分区的关系:

topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,

即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。

Kafka Consumer多线程消费实战和源码分析

如图,当consumer1进行一次commit操作时,会同时提交分区0、1的offset信息;

           当consumer2 进行一次commit操作时,只会提交分区2的offset信息;

2、多线程消费kafka数据

     根据消费者和分区关系,在结合多线程具体使用场景有:多线程处理一个消费者只消费一个分区;多线程处理一个消费者消费多个分区。对应模型图为:

Kafka Consumer多线程消费实战和源码分析

2.1 分析场景2

 线程1获得消费者,消费者进行提交操作,然后将分区2的offset+1,完成了本次的提交操作,这里实现了分区和线程关系的绑定,是线程安全的。

代码实现:

Kafka Consumer多线程消费实战和源码分析

2.2 分析场景1

   当线程1获取分区1的数据数据,分区2没有数据时就是场景2的场景不会有任何问题。但是当线程1获取分区1的数据数据,为提交时;线程2获取分区2的数据数据开始消费,这时分区1的offset并没有加1,这里就会有问题了。已做图方式进行解释:

Kafka Consumer多线程消费实战和源码分析

可以看到,当分区数据都被消费后,分区信息的offset应该为(1,1),但实际的分区offset为(1,0)或者(0,1)这个就出现线程不安全问题。

代码实现:

Kafka Consumer多线程消费实战和源码分析

2.2.1 源码分析

Kafka 的consumer有进行一定检查机制,先对进行部分源码分析:

从consumer的subscribe进行分析:

Kafka Consumer多线程消费实战和源码分析

可见首先进行调用acquireAndEnsureOpen(),

Kafka Consumer多线程消费实战和源码分析

根据调用链路可见,当线程1进入,首先获取线程id,然后进行线程id对比。由于初始化线程id为NO_CURRENT_THREAD = -1L;

进行操作1判断,线程id肯定不等于-1。进行操作2,通过一个cas操作,将currenThread的值更新成线程1id;

                               当线程2进行,首先获取线程id,然后进行线程id对比。进行操作1判断,线程2id肯定不等于线程1id,

                                                       进行操作2,通过一个cas操作,发现将currenThread的值为线程1id并不是NO_CURRENT_THREAD 所有抛出异常。

注:根据源码分析,consumer的subscribe和commitSync需要在同一个线程中进行操作,否则会线程异常(之前subscribe我写在主线程中,然后在子线程中commitSync。。。蠢哭了)

 

2.2.2 解决场景1问题

    解决的基本思路:就是在当多线程提交时,每个线程一次只提交自己消费分区的offset,其他分区信息不进行提交。

 

 本例中包含3个类:

 

  • ConsumerThreadHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
  • ConsumerWorker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
  • Main类:测试主方法类
  • Kafka Consumer多线程消费实战和源码分析

  •         大神问题解决源码的参考文档:https://www.bbsmax.com/A/l1dyPVY65e/