3.深入理解kafka:核心设计与实践原理

主题与分区

主题的管理

主题的管理包括创建主题、查看主题信息、修改主题和删除主题等操作。主题的管理并非只有使用 kafka-topics.sh 脚本这一种方式,我们还可以通过KafkaAdminClient 的方式实现

创建主题

如果broker端配置参数auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions (默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。很多时候,这种自动创建主题的行为都是非预期的。除非有特殊应用需求,否则不建议将auto.create.topics.enable参数设置为true,这个参数会增加主题的管理与维护的难度。

主题、分区、副本和 Log(日志)的关系如图 所示

3.深入理解kafka:核心设计与实践原理

我们不仅可以通过日志文件的根目录来查看集群中各个broker的分区副本的分配情况,还可以通过ZooKeeper客户端来获取。当创建一个主题时会在ZooKeeper的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。示例如下:3.深入理解kafka:核心设计与实践原理

到目前为止,创建主题时的分区副本都是按照既定的内部逻辑来进行分配的。kafka-topics.sh脚本中还提供了一个 replica-assignment 参数来手动指定分区副本的分配方案。replica-assignment参数的用法归纳如下:

3.深入理解kafka:核心设计与实践原理

这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用逗号“,”隔开,分区内多个副本用冒号“:”隔开。并且在使用replica-assignment参数创建主题时不需要原本必备的partitions和replication-factor这两个参数。

如果分区之间所指定的副本数不同,比如0:1,0,1:0这种,就会报出AdminOperationException异常.当然,类似0:1,,0:1,1:0这种企图跳过一个分区的行为也是不被允许的 

 在创建主题时我们还可以通过config参数来设置所要创建主题的相关参数,通过这个参数可以覆盖原本的默认配置。在创建主题时可以同时设置多个参数,具体的用法归纳如下:3.深入理解kafka:核心设计与实践原理

3.深入理解kafka:核心设计与实践原理

创建主题同名的时候会报错,常加上if-not-exist参数

kafka-topics.sh 脚本实质上是调用了 kafka.admin.TopicCommand 类,通过向 TopicCommand 类中传入一些关键参数来实现主题的管理。我们也可以直接调用TopicCommand类中的main()函数来直接管理主题,不过更推荐使用KafkaAdminClient来代替这种实现方式。3.深入理解kafka:核心设计与实践原理3.深入理解kafka:核心设计与实践原理

查看主题

list和describe指令可以用来方便地查看主题信息

通过list指令可以查看当前所有可用的主题

3.深入理解kafka:核心设计与实践原理

前面的章节我们都是通过 describe 指令来查看单个主题信息的,如果不使用--topic指定主题,则会展示出所有主题的详细信息。--topic还支持指定多个主题

3.深入理解kafka:核心设计与实践原理

在使用 describe 指令查看主题信息时还可以额外指定 topics-with-overrides、under-replicated-partitions和unavailable-partitions这三个参数来增加一些附加功能。增加topics-with-overrides参数可以找出所有包含覆盖配置的主题,它只会列出包含了与集群不一样配置的主题。注意使用topics-with-overrides参数时只显示原本只使用describe指令的第一行信息

under-replicated-partitions和unavailable-partitions参数都可以找出有问题的分区。通过under-replicated-partitions 参数可以找出所有包含失效副本的分区。

修改主题

kafka-topics.sh脚本中的alter指令提供修改分区个数,修改配置等.

示例如下:

3.深入理解kafka:核心设计与实践原理

注意上面提示的告警信息:当主题中的消息包含key时(即key不为null),根据key计算分区的行为就会受到影响。当topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区;当分区数增加到3时,就会根据消息的key来计算分区号,原本发往分区0的消息现在有可能会发往分区1或分区2。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。目前Kafka只支持增加分区数而不支持减少分区数。比如我们再将主题topic-config的分区数修改为1,就会报出InvalidPartitionException的异常

 为什么不支持减少分区?

实现此功能需要考虑的因素很多,比如删除的分区中的消息该如何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。

配置管理

kafka-configs.sh 脚本是专门用来对配置进行操作的,这里的操作是指在运行状态下修改原有的配置,如此可以达到动态变更的目的。

kafka-configs.sh脚本不仅可以支持操作主题相关的配置,还可以支持操作broker、用户和客户端这3个类型的配置。kafka-configs.sh脚本使用entity-type参数来指定操作配置的类型,并且使用entity-name参数来指定操作配置的名称。比如查看主题topic-config的配置可以按如下方式执行:

3.深入理解kafka:核心设计与实践原理

3.深入理解kafka:核心设计与实践原理

删除主题 

如果确定不再使用一个主题,那么最好的方式是将其删除,这样可以释放一些资源,比如磁盘、文件句柄等。kafka-topics.sh脚本中的delete指令就可以用来删除主题,可以看到在执行完删除命令之后会有相关的提示信息,这个提示信息和broker端配置参数delete.topic.enable 有关。必须将delete.topic.enable参数配置为true才能够删除主题,这个参数的默认值就是true,如果配置为false,那么删除主题的操作将会被忽略。在实际生产环境中,建议将这个参数的值设置为true。

v

3.深入理解kafka:核心设计与实践原理

尝试删除一个不存在的主题也会报错。比如下面的示例中尝试删除一个不存在的主题topic-unknown,需要使用if-exsit

使用kafka-topics.sh脚本删除主题的行为本质上只是在ZooKeeper中的/admin/delete_topics路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除的状态。与创建主题相同的是,真正删除主题的动作也是由Kafka的控制器负责完成的。了解这一原理之后,我们可以直接通过ZooKeeper的客户端来删除主题。下面示例中使用ZooKeeper客户端zkCli.sh来删除主题topic-delete:3.深入理解kafka:核心设计与实践原理

3.深入理解kafka:核心设计与实践原理


初识KafkaAdminClient

集管理、监控、运维、告警为一体的生态平台

KafkaAdminClient不仅可以用来管理broker、配置和ACL(Access Control List),还可以用来管理主题.其部分相关方法:

3.深入理解kafka:核心设计与实践原理

3.深入理解kafka:核心设计与实践原理

 主题合法性验证

为了防止程序员错误或者不规范,创建了不适合的主题等,

Kafka broker 端有一个这样的参数:create.topic.policy.class.name,默认值为null,它提供了一个入口用来验证主题创建的合法性。使用方式很简单,只需要自定义实现org.apache.kafka.server.policy.CreateTopicPolicy 接口,比如下面示例中的 PolicyDemo。然后在broker 端的配置文件 config/server.properties 中配置参数create.topic.policy.class.name的值为org.apache.kafka.server.policy.PolicyDemo,最后启动服务。

,主要实现接口中的configure()、close()及validate()方法,configure()方法会在Kafka服务启动的时候执行,validate()方法用来鉴定主题参数的合法性,其在创建主题时执行,close()方法在关闭Kafka服务时执行。

代码示例:

3.深入理解kafka:核心设计与实践原理


分区的管理

优先副本的选举

分区使用多副本机制来提升可靠性,但只有leader副本对外提供读写服务,而follower副本只负责在内部进行消息的同步。如果一个分区的leader副本不可用,那么就意味着整个分区变得不可用,此时就需要Kafka从剩余的follower副本中挑选一个新的leader副本来继续对外提供服务。虽然不够严谨,但从某种程度上说,broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。

为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferred replica)的概念。所谓的优先副本是指在 AR 集合列表中的第一个副本。

在 Kafka 中可以提供分区自动平衡的功能,与此对应的 broker 端参数是auto.leader.rebalance.enable,此参数的默认值为true,即默认情况下此功能是开启的。如果开启分区自动平衡的功能,则 Kafka 的控制器会启动一个定时任务,这个定时任务会轮询所有的 broker节点,计算每个broker节点的分区不平衡率(broker中的不平衡率=非优先副本的leader个数/分区总数)是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认值为 10%,如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。执行周期由参数leader.imbalance.check.interval.seconds控制,默认值为300秒,即5分钟。

不过在生产环境中不建议将auto.leader.rebalance.enable设置为默认的true,因为这可能引起负面的性能问题,也有可能引起客户端一定时间的阻塞。因为执行的时间无法自主掌控,如果在关键时期(比如电商大促波峰期)执行关键任务的关卡上执行优先副本的自动选举操作,势必会有业务阻塞、频繁超时之类的风险。前面也分析过,分区及副本的均衡也不能完全确保集群整体的均衡,并且集群中一定程度上的不均衡也是可以忍受的,为防止出现关键时期“掉链子”的行为,笔者建议还是将掌控权把控在自己的手中,可以针对此类相关的埋点指标设置相应的告警,在合适的时机执行合适的操作,而这个“合适的操作”就是指手动执行分区平衡。Kafka中kafka-perferred-replica-election.sh脚本提供了对分区leader副本进行重新平衡的功能

在实际生产环境中,一般使用 path-to-json-file 参数来分批、手动地执行优先副本的选举操作。尤其是在应对大规模的 Kafka 集群时,理应杜绝采用非 path-to-json-file参数的选举操作方式。同时,优先副本的选举操作也要注意避开业务高峰期,以免带来性能方面的负面影响。

分区重分配

Kafka提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。

kafka-reassign-partitions.sh 脚本的使用分为 3 个步骤:首先创建需要一个包含主题清单的JSON 文件,其次根据主题清单和 broker 节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。

分区重分配对集群的性能有很大的影响,需要占用额外的资源,比如网络和磁盘。在实际操作中,我们将降低重分配的粒度,分成多个小批次来执行,以此来将负面的影响降到最低,这一点和优先副本的选举有异曲同工之妙。还需要注意的是,如果要将某个broker下线,那么在执行分区重分配动作之前最好先关闭或重启broker。这样这个broker就不再是任何分区的leader节点了,它的分区就可以被分配给集群中的其他broker。这样可以减少broker间的流量复制,以此提升重分配的性能,以及减少对集群的影响。

复制限流