为什么我的消费者每次都从该主题读取所有消息,即使auto.offset.reset =最大时也是如此?
问题描述:
我已经在topic1上向Kafka发出了5条消息,并成功消耗了它们。当我发送第6条消息并尝试消费时,我再次获得全部6条消息,而不是最新的(第6条)消息。为什么我的消费者每次都从该主题读取所有消息,即使auto.offset.reset =最大时也是如此?
请注意我正在运行消费者命令行,而不是从数据库连接器(访问模块)。和连接器具有配置属性auto.offset.reset设置为“大”(请参阅下面从日志中的所有配置属性)
也请参阅下面的OffsetChecker输出:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
--group testjob --zookeeper localhost:2181 --topic topic1
[2017-07-06 21:57:46,707] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/testjob/offsets/topic1/0.
任何人都可以请让我知道问题在哪里?
这里是显示配置属性日志:
***Global config Properties***
* client.id = rdkafka
* message.max.bytes = 1200
* receive.message.max.bytes = 100000000
* metadata.request.timeout.ms = 60000
* topic.metadata.refresh.interval.ms = 600000
* topic.metadata.refresh.fast.cnt = 10
* topic.metadata.refresh.fast.interval.ms = 250
* topic.metadata.refresh.sparse = false
* socket.timeout.ms = 60000
* socket.send.buffer.bytes = 0
* socket.receive.buffer.bytes = 0
* socket.keepalive.enable = false
* socket.max.fails = 3
* broker.address.ttl = 300000
* broker.address.family = any
* statistics.interval.ms = 0
* log_cb = 0x7fecb80c6dd0
* log_level = 6
* socket_cb = 0x7fecb80cd2f0
* open_cb = 0x7fecb80ddd30
* opaque = 0x2641280
* internal.termination.signal = 0
* queued.min.messages = 100000
* queued.max.messages.kbytes = 1000000
* fetch.wait.max.ms = 100
* fetch.message.max.bytes = 1049776
* fetch.min.bytes = 1
* fetch.error.backoff.ms = 500
* group.id = testjob
* queue.buffering.max.messages = 100000
* queue.buffering.max.ms = 1000
* message.send.max.retries = 2
* retry.backoff.ms = 100
* compression.codec = none
* batch.num.messages = 1000
* delivery.report.only.error = false
* request.required.acks = 1
* enforce.isr.cnt = 0
* request.timeout.ms = 5000
* message.timeout.ms = 300000
* produce.offset.report = false
* auto.commit.enable = true
* auto.commit.interval.ms = 60000
* auto.offset.reset = largest <<<<--------
* offset.store.path = .
* offset.store.sync.interval.ms = 0
* offset.store.method = file
* consume.callback.max.messages = 0
答
添加这个特性 AUTO_OFFSET_RESET_CONFIG =“最早”,将工作
你是如何运行的消费者?完整的命令行可能有助于诊断出错的地方。 – amethystic
不知道你的消费者只能猜测。也许你的消费者运行时间少于60s(auto.commit.interval),并且被杀死而不是正常关机。关于zookeeper上缺少的节点:可能是您正在运行一个“新消费者”,它不会向ZK提交偏移量。或者你不写信给ZK的根路径(我会推荐)。检查你的代理配置(zookeeper.connect)。这可能看起来像:'localhost:2181/kafka' - 在这种情况下,您必须在运行偏移量检查器时将路径添加到ZK连接字符串中。 – TobiSH