有没有办法阻止自定义Kafka Deserializers的无限重试?

问题描述:

我正在使用一个自定义的Kafka反序列化器,它可以封送来自JSON的对象。有没有办法阻止自定义Kafka Deserializers的无限重试?

val props = Map(
    "bootstrap.servers" -> kafkaQueue.kafkaHost, 
    "group.id" -> adapterServer.adapterConfig.kafkaGroup, 
    "enable.auto.commit" -> "false", 
    "max.poll.records" -> "1", 
    "auto.offset.reset" -> "earliest", 
    "auto.commit.interval.ms" -> "1000", 
    "key.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", 
    "value.deserializer" -> "com.mystuff.CustomJSONDeserializer" 
) 
new KafkaConsumer[Array[Byte], MyMessage](props) 

有一件事情我已经看到的是,如果有人发布不良JSON的话题,卡夫卡试图与我的自定义解串器反序列化它 - 不能。 CustomJSONDeserializer引发异常,但Kafka只是继续尝试。

因此,它只是无限尝试重新反序列化不良的JSON,实质上陷入困境。由于这一切都发生在卡夫卡内部,我不知道如何阻止它,并告诉它继续下一条消息。

我该如何避免这种情况?

如果您发现异常,您的应用程序可以搜索通过受影响的消息以避免它。 0.10.0.0消费者实际上有一个错误masking deserialization errors,所以我不确定早期版本是否能正确传播异常。我们至少会记录关于即将发布的0.10.0.1版本中导致错误的消息的主题,分区和偏移的信息。