librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

缺省配置

默认情况下,Kafka消费者从最后一次提交的偏移量位置(offset)开始消费消息,如果Topic+Partition和Group之前没有提交过偏移量,它订阅消息开始位置取决于Topic的配置属性auto.offset.reset的设置。默认为最新(latest),也就是在分区末尾开始消耗(仅消费新消息)。相关配置可以参考官方文档:https://kafka.apache.org/documentation/#topicconfigs
方便查阅,截个图:
librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置

相关接口信息

librdkafka提供了assign() API,通过设置rd_kafka_topic_partition_t的.offset属性,你可以指定每一个Partition的起始偏移。偏移量可以是一个绝对的偏移(>0),或逻辑偏移 (BEGINNING, END, STORED, TAIL(…))。
rdkafka.h头文件中定义了Partition的管理结构rd_kafka_topic_partition_t,包含offset信息;同时提供了逻辑偏移的定义RD_KAFKA_OFFSET_XXX。

/**
 * @brief Generic place holder for a specific Topic+Partition.
 *
 * @sa rd_kafka_topic_partition_list_new()
 */
typedef struct rd_kafka_topic_partition_s {
        char        *topic;             /**< Topic name */
        int32_t      partition;         /**< Partition */
	    int64_t      offset;            /**< Offset */
        void        *metadata;          /**< Metadata */
        size_t       metadata_size;     /**< Metadata size */
        void        *opaque;            /**< Application opaque */
        rd_kafka_resp_err_t err;        /**< Error code, depending on use. */
        void       *_private;           /**< INTERNAL USE ONLY,
                                         *   INITIALIZE TO ZERO, DO NOT TOUCH */
} rd_kafka_topic_partition_t;

////////////////////////////////////////////////////////////
#define RD_KAFKA_OFFSET_BEGINNING -2  /**< Start consuming from beginning of
				       *   kafka partition queue: oldest msg */
#define RD_KAFKA_OFFSET_END       -1  /**< Start consuming from end of kafka
				       *   partition queue: next msg */
#define RD_KAFKA_OFFSET_STORED -1000  /**< Start consuming from offset retrieved
				       *   from offset store */
#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */


/** @cond NO_DOC */
#define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
/** @endcond */

/**
 * @brief Start consuming \p CNT messages from topic's current end offset.
 *
 * That is, if current end offset is 12345 and \p CNT is 200, it will start
 * consuming from offset \c 12345-200 = \c 12145. */
#define RD_KAFKA_OFFSET_TAIL(CNT)  (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))

通过rd_kafka_assign()函数接口可以配置需要消费的Partition信息。

/**
 * @brief Atomic assignment of partitions to consume.
 *
 * The new \p partitions will replace the existing assignment.
 *
 * When used from a rebalance callback the application shall pass the
 * partition list passed to the callback (or a copy of it) (even if the list
 * is empty) rather than NULL to maintain internal join state.

 * A zero-length \p partitions will treat the partitions as a valid,
 * albeit empty, assignment, and maintain internal state, while a \c NULL
 * value for \p partitions will reset and clear the internal state.
 */
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_assign (rd_kafka_t *rk,
                 const rd_kafka_topic_partition_list_t *partitions);

如何配置offset

对于消费者来说,有两个场景来修改订阅的Parttion offset信息:一是系统初始化时直接指定offset信息,二是消费者群组重平衡(rebalance)的回调函数。接下来分别介绍一下。
系统初始化时,指定offset示例:

rd_kafka_topic_partition_list_t *partitions;
partitions = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_list_add(partitions, "mytopic", 3)->offset = 1234;
rd_kafka_assign(rk, partitions);
rd_kafka_topic_partition_list_destroy(partitions);  

rebalance_cb()回调函数中,指定offset示例。

void my_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
                      rd_kafka_topic_partition_list_t *partitions, void *opaque) {
   if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
       rd_kafka_topic_partition_t *part;
       if ((part = rd_kafka_topic_partition_list_find(partitions, "mytopic", 3)))
           part->offset = 1234;
       rd_kafka_assign(rk, partitions);
   }  else {
       rd_kafka_assign(rk, NULL);
   }
}

欢迎订阅个人公众号

打个广告,欢迎订阅个人的公众号,文章会在公众号和博客同步发布。
librdkafka: 如何设置Kafka消费者订阅消息的起始偏移位置