在kafka中发送同步消息?
问题描述:
如何发送kafka中的同步消息?
实现它的一种方法可以通过设置属性参数max.in.flight.requests.per.connection = 1
。在kafka中发送同步消息?
但我想知道是否有以kafka发送同步消息的直接或替代方式。 (类似producer.syncSend(...)等)。
答
生产者API从send
返回Future
。您可以拨打Future#get
以阻止发送完成。
看到这个example from the Javadocs:
如果要模拟一个简单的阻塞调用你可以调用立即get()方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
答
的蒂洛提出的答案是必经之路走。通常,关于使用max.in.flight.requests.per.connection = 1的建议用于启用仍然重试的消息,但不会丢失消息排序。它不是用于拥有同步制作者。
你能解释你为什么要这么做吗?如果这是关于消息顺序保证的话,这可能会更复杂一些(并且同步发送它们并不会真正改变那里的东西)。 – Thilo
由同一个生产者发送到同一主题中同一分区的消息将保留该顺序。跨多个分区,主题或生产者没有订单保证。如果需要,您必须在应用程序代码中的消费者端安排消息(例如通过查看消息时间戳)。 – Thilo
@Thilo,这引起我另一个问题。每个分区或每个主题或每个生产者是否批量生产?或者这些的一些组合。 ? – joveny