springboot整合rabbitmq实现消息确认机制

本来想些关于springboot整合rabbitmq中的direct模式,结果出现了消息无法全部消费完。等这个bug解决了再写这个吧

今天先写rabbitmq的消息确认机制

先修改配置文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
    publisher-confirms: true #开启消息确认机制
    publisher-returns: true #支持消息发送失败返回队列
@Configuration
public class TopicConfiguration {

    @Bean
    public Queue queueA(){
        return new Queue("topicC");
    }

    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    Binding binding(Queue queueA, TopicExchange topicExchange){
        return BindingBuilder.bind(queueA).to(topicExchange).with("topic");
    }
}

 先看消费端的代码:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);中的false,是将自动确认机制关闭

关于channel接口的使用请参考

https://www.cnblogs.com/piaolingzxh/p/5448927.html

@Component
public class TopicReceive {

    @RabbitHandler
    @RabbitListener(queues = "topicC")
    public void receive(Message message, Channel channel) throws IOException {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("接受到的消息为"+new String(message.getBody(),"UTF-8"));
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }
    }
}

发送端代码:

实现RabbitTemplate.ConfirmCallback接口,重写confirm方法

@Component
public class TopicSend implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg){
        rabbitTemplate.setConfirmCallback(this);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("发送的消息为"+msg);
        this.rabbitTemplate.convertAndSend("topicExchange","topic",msg,correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送确认成功");
        } else {
            System.out.println("消息发送确认失败:" + cause);
        }
    }
}

 测试结果会发现正常

但是如果我们将exchange修改,发送到一个不存在的exchange中,会怎么样呢?

会发现confirm回调为false,打印出结果为不存在topicExchange1111的交换机

springboot整合rabbitmq实现消息确认机制

 如果我们在消费端处理逻辑时出错会怎么样呢?修改消费端代码我们在消费时让它报错

springboot整合rabbitmq实现消息确认机制

confirm回调为true,但是在rabbitmq的web界面会发现存在一条没有消费的消息

springboot整合rabbitmq实现消息确认机制

如果我们把channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);中最后一个参数改为false呢,会发现在web管理界面没有未被消费的消息,说明这条消息已经被摒弃

实际开发中,到底是打回到队列呢还是摒弃,要看自己的需求,但是打回队列应该有次数限制,不然会陷入死循环。

继续修改发送端的代码,在原先的基础上在实现RabbitTemplate.ReturnCallback接口。同时在发送端添加

rabbitTemplate.setReturnCallback(this)以及在application.yml中添加

springboot整合rabbitmq实现消息确认机制

@Component
public class TopicSend implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("发送的消息为"+msg);
        this.rabbitTemplate.convertAndSend("topicExchange","topic",msg,correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送确认成功");
        } else {
            System.out.println("消息发送确认失败:" + cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            System.out.println("return--message:" + new String(message.getBody(), "UTF-8") + ",replyCode:" + replyCode
                    + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
        } catch (UnsupportedEncodingException e) {
        }
    }
}

 发送时,将routingKey修改为一个没有的key,

打印结果:

springboot整合rabbitmq实现消息确认机制

结论:

如果消息没有到exchange,则confirm回调,ack=false

如果消息到达exchange,则confirm回调,ack=true

exchange到queue成功,则不回调return

exchange到queue失败,则回调return