006 rabbitmq生产者消息可靠投递实现

一、前言:

1)生产者的消息,往mq服务器投递时,因网络中断、抖动等原因导致投递失败;

2)本文采取实现ConfirmCallback接口,对消息投递进行确认;

a、若返回true,则认为消息已成功投递;

b、若返回false,则认为消息投递失败,需要重新投递;

3)ConfirmCallback接口在消息不能到达exchange时进行回调;

4)模拟消息投递失败的方案:在消息投递时,强制关闭connection连接;

 

二、可靠投递的具体方案:

1)生产者在发送消息之前,生成ack唯一确认id;

2)以ackId为键,消息为value,保存进redis缓存;

3)实现ConfirmCallback接口;

4)ConfirmCallback触发回调时,若ack为true,则直接删除此次ackId对应的msg;若ack为false,则走5步骤;

5)若ack为false,则将该ackId对应的msg取出,放置进失败的ackFailList中;

6)开启定时任务,扫描ackFailList,以一定策略重发失败的msg;

 

上述方案会产生消息重复发送的问题,考虑到消息发送的速度应尽可能快,才能支持更高的并发。故去重动作,放到了消费端实现。具体方案如下:

消息重复消费处理方案:

1)使用业务ID,即businessId,作为消费唯一依据;(businessId可设计为该业务的具体业务ID,或是设计为与业务无关的唯一性ID)

2)生产者在发送消息时,将businessId、businessBody封装在BusinessMsg中;

3)消费者在接收到BusinessMsg后,判断bisinessId是否在已消费集合;

4)若已消费集合存在该businessId,则不再消费;

5)否则,消费businessId代表的业务消息;消费完毕后,将该businessId保存进已消费集合;

 

详述:

1、定义队列;

006 rabbitmq生产者消息可靠投递实现

2、定义生产者;

006 rabbitmq生产者消息可靠投递实现

 

3、定义消费者;

006 rabbitmq生产者消息可靠投递实现

 

4、对信道开启confirm模式;

006 rabbitmq生产者消息可靠投递实现

 

4、实现ConfirmCallback接口;

006 rabbitmq生产者消息可靠投递实现

 

006 rabbitmq生产者消息可靠投递实现

 

5、测试定义;

006 rabbitmq生产者消息可靠投递实现

 

6、消息投递丢失测试,测试数据量为3w;

1)在消息投递过程中,强制关闭connection连接;

006 rabbitmq生产者消息可靠投递实现

2)测试结果如下:

006 rabbitmq生产者消息可靠投递实现

3)结论:

a)消息没有丢失;

b)消息接收重复了,总量为39548条;

c)消息消费符合预期,总量为3w条,正常;

7、demo地址;

https://github.com/haishui211/rabbitmqRep.git

8、参考地址;

https://www.jianshu.com/p/6579e48d18ae