卡夫卡生产者:如何处理“java.net.ConnectException:连接拒绝”
问题描述:
我使用卡夫卡0.10.1.0。卡夫卡生产者:如何处理“java.net.ConnectException:连接拒绝”
这是我的制片人:
val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
但万一上面的回调不能处理java.net.ConnectException: Connection refused
当卡夫卡服务器关闭。
UPD
的ConnectionException
在另一个线程被升高(到其中使用到KafkaProducer
Sender
类)。因此我们不能使用try {} catch
。此外,我不需要重试机制,我需要一种方法来处理这种情况(例如,如果Kafka已关闭,生产者无法发送消息,那么我将使用另一个Queue API)。
有没有办法处理这个异常?
答
您有几个选项。斯卡拉提供了一种方法来捕获异常,其采用以下形式:
try {
// ...
}
catch {
case ioe: IOException => ... // more specific cases first !
case e: Exception => ...
}
所以最简单的方法是:
try {
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
}
catch {
case ce: ConnectionException => // handle exception
}
更复杂,但是更强大的将是一个重试机制:
What's the Scala way to implement a retry-able call like this one?
另请注意,卡夫卡制作人内置了一个重试机制, 也可能证明helpf UL:
设置的值大于零的意志 导致客户端重新发送它的发送失败了 可能瞬时错误的任何记录。请注意,如果客户端在接收到错误时重新发送记录,则该重试与 没有区别。允许 重试但未将max.in.flight.requests.per.connection设置为1 将潜在地更改记录的排序,因为如果两个批次 被发送到单个分区,并且第一个失败并重试,但第二个成功则返回 ,那么第二批中的记录可能首先出现 。
嗨@crypto,谢谢你的回答。但是这种方式将不起作用,因为'ConnectionException'在另一个线程中被引发(进入'KafkaProducer'中使用的'Sender'类)。因此我们不能使用'try {} catch'。 –
单独的线程是你的回调吗? – crypto
是的,这是一个单独的线程,在'producer.send'方法内创建的 –