rabbimq消费者实现异常重试机制

功能描述

异常重试指的是当消费者处理消息异常失败时,为保证数据最终一致性,通过设置重试策略来对消息进行重复再消费。对于重试策略我们指定延迟多长时间重试一次,重试多少次,以及时间单位等。

策略描述

原理:利用rabbitmq的死信原理,参照上一篇文章rabbimq队列之死信队列和延迟队列

参数:TimeUnit(延迟时间单位),retryDelayTime(long型,失败后过多久的时间执行),retries(int型,重试次数)

原理图:

rabbimq消费者实现异常重试机制

其中,有几个点需要说明一下

1.normal_exchange_delay和normal_queue_delay是死信队列,设置了队列属性x-message-ttl的属性,即指定了延时时间,达到x-message-ttl设置的时间未消费的就会自动路由到normal_exchange_requeue交换器,从而实现了延迟重试。这里需要解释的是为什么要重新创建一个新的交换器normal_exchange_requeue:因为当你的normal_exchange绑定多个队列,且是topic类型的交换器时,如果你的死信队列指定的死信交换器是该交换器,消息存在发送到多个队列的风险,会对别的系统造成重复消费。

2.max retry如何获取,rabbitmq有一个属性,就是当一个消息进入到死信队列时,消费者收到的AMQP.BasicProperties参数中的headers属性中的cout参数就会加1,利用这个属性我们可以获取到该消息被消费了多少次。代码:

(Long) (((HashMap) ((ArrayList) basicProperties.getHeaders().get("x-death")).get(0)).get("count"));

注意点

1.这里的重试机制是利用新建对应的无消费者的死信队列,通过设置x-message-ttl来实现延时重试的。也就是说初始化的时候对应队列的重试队列就创建完成了。但是rabbitmq队列有个特性,就是一旦被创建,队列的属性就不允许更改,若你再次创建同名不同属性的队列就会引发channel异常关闭。这样第一个问题就出现了,如何可以动态的修改延迟重试时间呢,我们可以利用两个工具达成

    a.每次初始化时查看对应队列的反哺队列是否存在,channel.queueDeclarePassive(queueName)

    b.存在的话利用rabitmq提供的API获取反哺队列的x-message-ttl属性http://ip:15672/api/queues/vhost/queueName            HTTP1.1 GET

    c.对比初始化传入的参数和利用httpAPi 获取的参数是否一样,一样不做处理,不一样删除以前的反哺队列,新建。

2.不支持集群环境下的重试机制,即当有多个不同jvm的consumer消费同一个队列时,一个consumer重试机制的修改会导致其余消费者的重试机制失效,或是一个添加了重试策略对不同的jvm不起作用,当然可以考虑利用zookeeper实现。