卡夫卡生产者:如何处理“java.net.ConnectException:连接拒绝”

问题描述:

我使用卡夫卡0.10.1.0。卡夫卡生产者:如何处理“java.net.ConnectException:连接拒绝”

这是我的制片人:

val props: Properties = ... 
val producer = new KafkaProducer[String, AnyRef](props) 
val callback = new Callback { 
    override def onCompletion(md: RecordMetadata, e: Exception): Unit = ... 
} 
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback) 

但万一上面的回调不能处理java.net.ConnectException: Connection refused当卡夫卡服务器关闭。

UPD

ConnectionException在另一个线程被升高(到其中使用到KafkaProducerSender类)。因此我们不能使用try {} catch。此外,我不需要重试机制,我需要一种方法来处理这种情况(例如,如果Kafka已关闭,生产者无法发送消息,那么我将使用另一个Queue API)。

有没有办法处理这个异常?

您有几个选项。斯卡拉提供了一种方法来捕获异常,其采用以下形式:

try { 
    // ... 
    } 
    catch { 
    case ioe: IOException => ... // more specific cases first ! 
    case e: Exception => ... 
    } 

所以最简单的方法是:

try { 
    producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback) 
    } 
    catch { 
    case ce: ConnectionException => // handle exception 
    } 

更复杂,但是更强大的将是一个重试机制:

What's the Scala way to implement a retry-able call like this one?

另请注意,卡夫卡制作人内置了一个重试机制, 也可能证明helpf UL:

设置的值大于零的意志 导致客户端重新发送它的发送失败了 可能瞬时错误的任何记录。请注意,如果客户端在接收到错误时重新发送记录,则该重试与 没有区别。允许 重试但未将max.in.flight.requests.per.connection设置为1 将潜在地更改记录的排序,因为如果两个批次 被发送到单个分区,并且第一个失败并重试,但第二个成功则返回 ,那么第二批中的记录可能首先出现 。

+0

嗨@crypto,谢谢你的回答。但是这种方式将不起作用,因为'ConnectionException'在另一个线程中被引发(进入'KafkaProducer'中使用的'Sender'类)。因此我们不能使用'try {} catch'。 –

+0

单独的线程是你的回调吗? – crypto

+0

是的,这是一个单独的线程,在'producer.send'方法内创建的 –