在卡夫卡消费活的消息

问题描述:

我已经开始了我的动物园管理员和卡夫卡服务器。 我开始了我的卡夫卡制作人,他发送了10封主题为'xxx'的信息。然后停下了我的卡夫卡制片人。 现在我开始了我的卡夫卡消费者并订阅了主题'xxx'。我的消费者使用我的Kafka制作人发送的10条消息,该消息现在不在运行。 我需要我的Kafka使用者应该只使用运行Kafka服务器的消息。 有什么办法可以做到这一点? 以下是我的消费属性中的内容。在卡夫卡消费活的消息

props.put("bootstrap.servers", "localhost:9092"); 
    String consumeGroup = "cg1"; 
    props.put("group.id", consumeGroup); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("auto.commit.interval.ms", "100"); 
    props.put("heartbeat.interval.ms", "3000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
+0

您可以丢弃任何正在等待的消息,只能在程序启动后到达。如果您只想要非持久性消息传递几乎任何其他消息传递解决方案可能是更好的选择,则Kafka旨在持久保留消息(因为您需要此功能,因此使用Kafka)。 –

+1

是否有任何标志可以识别等待很长时间的消息? – Prasath

+0

删除此属性'props.put(“auto.offset.reset”,“earliest”);',它与ConsumerConfig.AUTO_OFFSET_RESET_CONFIG相同。只保留最新的属性。 – Rambler

设置以下属性:

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 

它告诉消费者只阅读最新的消息,那就是,这是消费者以后公布消息开始。

+0

为我的消费者使用提供的财产。 Stil我正在收到非现场制作人的数据。 – Prasath

+0

您正在使用哪个kafka api版本? – Rambler

+0

以下依赖关系我使用过。此外,我更新了我的问题中的消费者属性。 \t \t \t ch.qos.logback \t \t \t 的logback经典 \t \t \t 1.0.13 \t \t \t \t \t \t \t org.apache。卡夫卡 \t \t \t 卡夫卡客户 \t \t \t 0.9.0.0 \t \t \t \t \t \t \t org.apache.avro \t \t \t 的Avro \t \t \t 1.7.7 \t \t Prasath