Apache Kafka系列(四) 多线程Consumer方案

http://www.cnblogs.com/qizhelongdeyang/p/7355309.html

本文的图片是通过PPT截图出的,读者如果修改意见请联系我

一、Consumer为何需要实现多线程

  假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息。该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用API写入消息到Kafka Cluster中,然后消息的订阅者通过Consumer读取消息,刚开始的时候系统架构图如下:

Apache Kafka系列(四) 多线程Consumer方案

          但是,随着用户数量的增多,通知的数据也会对应的增长。总会达到一个阈值,在这个点上,Producer产生的数量大于Consumer能够消费的数量。那么Broker中未消费的消息就会逐渐增多。即使Kafka使用了优秀的消息持久化机制来保存未被消费的消息,但是Kafka的消息保留机制限制(时间,分区大小,消息Key)也会使得始终未被消费的Message被永久性的删除。另一方面从业务上讲,一个消息通知系统的高延迟几乎算作是废物了。所以多线程的Consumer模型是非常有必要的。

二、多线程的Kafka Consumer 模型类别

  基于Consumer的多线程模型有两种类型:

  • 模型一:多个Consumer且每一个Consumer有自己的线程,对应的架构图如下:

                         Apache Kafka系列(四) 多线程Consumer方案

 

  • 模型二:一个Consumer且有多个Worker线程

                         Apache Kafka系列(四) 多线程Consumer方案

     两种实现方式的优点/缺点比较如下:

名称 优点 缺点
模型一

1.Consumer Group容易实现

2.各个Partition的顺序实现更容易

1.Consumer的数量不能超过Partition的数量,否则多出的Consumer永远不会被使用到

2.因没个Consumer都需要一个TCP链接,会造成大量的系统性能损耗

模型二 1.由于通过线程池实现了Consumer,横向扩展更方便

1.在每个Partition上实现顺序处理更困难。

例如:同一个Partition上有两个待处理的Message需要被线程池中的2个线程消费掉,那这两个线程必须实现同步

三、代码实现

3.1 前提

    • Kafka Broker 0.11.0
    • JDK1.8
    • IDEA
    • Maven3
    • Kafka环境搭建及Topic创建修改等请参照本系列的前几篇文章。

 3.2 源码结构

                 Apache Kafka系列(四) 多线程Consumer方案

       其中,consumergroup包下面对应的是模型一的代码,consumerthread包下是模型二的代码。ProducerThread是生产者代码。

 3.3 pom.xml

Apache Kafka系列(四) 多线程Consumer方案

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.randy</groupId>
  <artifactId>kafka_multithread_consumer_model</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>kafka_multithread_consumer_model Maven Webapp</name>
  <url>http://maven.apache.org</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>kafka_multithread_consumer_model</finalName>
  </build>
</project>

Apache Kafka系列(四) 多线程Consumer方案

 3.4 方案一:Consumer Group

  ProducerThread.java是一个生产者线程,发送消息到Broker

  ConsumerThread.java是一个消费者线程,由于消费消息

  ConsumerGroup.java用于产生一组消费者线程

  ConsumerGroupMain.java是入口类     

3.4.1 ProducerThread.java 

Apache Kafka系列(四) 多线程Consumer方案 View Code

 3.4.2 ConsumerThread.java

Apache Kafka系列(四) 多线程Consumer方案 View Code

3.4.3 ConsumerGroup.java

Apache Kafka系列(四) 多线程Consumer方案 View Code

3.4.4 ConsumerGroupMain.java  

Apache Kafka系列(四) 多线程Consumer方案 View Code

3.5 方案二:多线程的Consumer

  ConsumerThreadHandler.java用于处理发送到消费者的消息

  ConsumerThread.java是消费者使用线程池的方式初始化消费者线程

  ConsumerThreadMain.java是入口类

3.5.1 ConsumerThreadHandler.java

Apache Kafka系列(四) 多线程Consumer方案 View Code

3.5.2 ConsumerThread.java

Apache Kafka系列(四) 多线程Consumer方案 View Code

3.5.3 ConsumerThreadMain.java

Apache Kafka系列(四) 多线程Consumer方案 View Code

四. 总结

  本篇文章列举了两种不同的消费者模式。两者各有利弊。所有代码都上传到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,如有疑问或者错误请指正