在kafka中发送同步消息?

问题描述:

如何发送kafka中的同步消息?
实现它的一种方法可以通过设置属性参数
max.in.flight.requests.per.connection = 1在kafka中发送同步消息?

但我想知道是否有以kafka发送同步消息的直接或替代方式。 (类似producer.syncSend(...)等)。

+0

你能解释你为什么要这么做吗?如果这是关于消息顺序保证的话,这可能会更复杂一些(并且同步发送它们并不会真正改变那里的东西)。 – Thilo

+0

由同一个生产者发送到同一主题中同一分区的消息将保留该顺序。跨多个分区,主题或生产者没有订单保证。如果需要,您必须在应用程序代码中的消费者端安排消息(例如通过查看消息时间戳)。 – Thilo

+0

@Thilo,这引起我另一个问题。每个分区或每个主题或每个生产者是否批量生产?或者这些的一些组合。 ? – joveny

生产者API从send返回Future。您可以拨打Future#get以阻止发送完成。

看到这个example from the Javadocs

如果要模拟一个简单的阻塞调用你可以调用立即get()方法:

byte[] key = "key".getBytes(); 
byte[] value = "value".getBytes(); 
ProducerRecord<byte[],byte[]> record = 
    new ProducerRecord<byte[],byte[]>("my-topic", key, value) 
producer.send(record).get(); 

的蒂洛提出的答案是必经之路走。通常,关于使用max.in.flight.requests.per.connection = 1的建议用于启用仍然重试的消息,但不会丢失消息排序。它不是用于拥有同步制作者。