从源码理解 Kafka 的分区选择策略[新的生产者发送消息分区选择]

Kafka 中将 Topic 分为 partition,消费者从 partition 中消费消息。消息是怎么确定发住哪个 partition 呢?其实默认有两种分区选择策略:

  1. 消息 key 为空时随机选择
  2. 消息 key 不为空时,对 key 进行 HASH,然后对分区数取模

源码分析

在 KafkaProducer 的 doSend 方法中调用了以下方法进行分区选择,如果指定了分区,则直接使用指定的分区。如果没有指定则通过默认的 partitioner 来计算出分区。

1
2
3
4
5
6
7
8
// org.apache.kafka.clients.producer.KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

默认使用的分区处理类是 DefaultPartitioner,其内的实现如下:

1
2
3
4
5
6
7
8
9
10
// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

如果提供了 key 值,则用 murmur2 的方法对 key 进行 HASH 并对 numPartitions(Topic 的分区数)取模。

当 Key 为空时,通过 stickyPartitionCache 的 partition 方法计算出分区。StickyPartitionCache 是 Kafka Client 内部的一个类,用于管理 Topic 的分区选择的逻辑和缓存。

1
2
3
4
5
6
7
8
// org.apache.kafka.clients.producer.internals.StickyPartitionCache#partition
public int partition(String topic, Cluster cluster) {
    Integer part = indexCache.get(topic);
    if (part == null) {
        return nextPartition(topic, cluster, -1);
    }
    return part;
}

indexCache 是一个 ConcurrentHashMap 对象,对应的是 Topic -> Partition 的映射,如果该值不存在则调用 nextPartition 方法选择一个分区并缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// org.apache.kafka.clients.producer.internals.StickyPartitionCache#nextPartition
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    Integer oldPart = indexCache.get(topic);
    Integer newPart = oldPart;
    // Check that the current sticky partition for the topic is either not set or that the partition that 
    // triggered the new batch matches the sticky partition that needs to be changed.
    if (oldPart == null || oldPart == prevPartition) {
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() < 1) {
            Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
            newPart = random % partitions.size();
        } else if (availablePartitions.size() == 1) {
            newPart = availablePartitions.get(0).partition();
        } else {
            while (newPart == null || newPart.equals(oldPart)) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = availablePartitions.get(random % availablePartitions.size()).partition();
            }
        }
        // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
        if (oldPart == null) {
            indexCache.putIfAbsent(topic, newPart);
        } else {
            indexCache.replace(topic, prevPartition, newPart);
        }
        return indexCache.get(topic);
    }
    return indexCache.get(topic);
}

第一个分支条件 oldPart == null || oldPart == prevPartition 中:

  1. oldPart == null 表示没有分区缓存,对应着新增 topic 或第一次调用的情况
  2. oldPart == prevPartition 当创建了新的 Batch 时触发了此方法的情况,对分区缓存进行更新

满足以上条件之一就进入真正选择分区的逻辑。

关于 Batch:

  1. 每个 batch 的数据属于同一个 partition
  2. Sticker Partitioner 是 Kafka 对空 key 的分区选择进行的优化,尽量在一个 Batch 中提交多几条数据。当 Batch 满或 linger.ms 时间到 才触发选择新的分区,在这之前,所有消息都会发到缓存的分区。原来的逻辑会每条消息都选择新的分区,可能造成很多 batch 太小,客户端请求过多,降低呑吐

从源码理解 Kafka 的分区选择策略[新的生产者发送消息分区选择]

后面的逻辑根据可用分区数进行处理,决定新的分区:

  1. 如果无可用分区,从所有分区里随机选择一个分区
  2. 只有一个可用分区,直接选用该分区
  3. 多个可用分区,随机取一个不等于当前分区的可用分区

结论

  1. 消息 key 为空时,如果有缓存分区,使用缓存分区,没有缓存则随机选择
  2. 消息 key 不为空时,对 key 进行 HASH,然后对分区数取模

参考:

kafka生产者分区优化
Apache Kafka Producer Improvements with the Sticky Partitioner