(9)RabbitMQ消息持久化、Ack确认与重回队列

前面几节讲了RabbitMQ的入门以及几种常用交换机的使用方法,从本节开始将会介绍RabbitMQ的高级特性,这节就先介绍一下RabbitMQ的消息持久化、消费者Ack确认、消费者Nack拒绝与requeue重回队列。

RabbitMQ消息持久化

RabbitMQ持久化介绍

之前的博客代码例子中发的消息都是非持久化的,就是只会存储到内存中,如果RabbitMQ服务器重启、关闭、意外挂掉的话,还没来得及消费的消息就会丢失掉,这样是非常不安全的。为了防止RabbitMQ的消息丢失,我们可以开启持久化开关,RabbitMQ的持久化分三个部分:交换机的持久化、队列的持久化、消息的持久化。

注意:开启消息持久化的话,因为要写入磁盘文件,效率肯定要比不开启要低一些,不过是可以接受的,生产上一般都会开启持久化的,除非消息不是特别重要,可以容忍消息的丢失。

交换机持久化:可以在声明交换机的时候将durable设为true,例如channel.exchangeDeclare("persitent_exchange", "fanout", true,false,null) 表示声明了一个名字为persitent_exchange、fanout类型、持久化、非自动删除的exchange。如果交换机不设置持久化,那么RabbitMQ重启之后该交换机就不存在了,不过消息还在,但是不能往该交换机发送消息,一般都会设为持久化的。

队列持久化:可以在声明队列的时候将durable设为true,例如 channel.queueDeclare("persistent_queue", true, false, false, null) 表示声明了一个名字为persistent_queue、持久化、非排他、非自动删除的队列。如果队列不设置持久化,那么RabbitMQ重启之后该队列上的消息就丢失了。

消息持久化:发送消息的方法为channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body),持久化可以在第三个参数props中设置,其部分源码如下:里面列举了所有的14种属性

public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;

        //省略............
}

其中deliveryMode为持久化相关的属性,将deliveryMode设为2表示设置消息持久化,可以通过如下代码设置持久化属性

BasicProperties props=new BasicProperties().builder().deliveryMode(2).build();

还可以通过BasicProperties props= MessageProperties.PERSISTENT_TEXT_PLAIN; 的方式设置,其实这种方式只不过是上面的封装罢了。

    /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
    public static final BasicProperties PERSISTENT_TEXT_PLAIN =
            new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);

代码及演示步骤

生产者代码如下:

package cn.wkp.rabbitmq.newest.persistent;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import cn.wkp.rabbitmq.util.ConnectionUtil;

public class Send {

    private final static String EXCHANGE_NAME = "persistent_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明交换机-持久化,非自动删除
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true,false,null);

        // 消息内容
    	String message = "这是一条持久化消息";
    	BasicProperties props= MessageProperties.PERSISTENT_TEXT_PLAIN;
//    	BasicProperties props=new BasicProperties().builder().deliveryMode(2).build();
	channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
    	System.out.println("Sent message:" + message);
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消费者代码如下:

package cn.wkp.rabbitmq.newest.persistent;

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cn.wkp.rabbitmq.util.ConnectionUtil;

public class Recv {

	private final static String EXCHANGE_NAME = "persistent_exchange";
	private final static String QUEUE_NAME = "persistent_queue";

	public static void main(String[] argv) throws Exception {

		// 获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();

		// 声明交换机-持久化,非自动删除
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true,false,null);
		// 声明队列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                // 绑定队列到交换机
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		
		//指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
		channel.basicQos(1);

		// 定义队列的消费者
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消费者收到消息:" + new String(body));
				//消费者手动发送ack应答
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		// 监听队列
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

操作步骤:

  • 第一步:先启动消费者,这样会帮我们创建持久化的交换机和队列,并将队列与交换机进行绑定;
  • 第二步:关闭消费者(防止消息一经发出就立马被消费了),然后运行生产者发送消息。
  • 第三步:重启RabbitMQ服务,然后通过 http://ip:15672 登录web管理台查看队列、交换机、消息是否还在进行验证。
(9)RabbitMQ消息持久化、Ack确认与重回队列
持久化交换机

 

(9)RabbitMQ消息持久化、Ack确认与重回队列
持久化队列和持久化消息

我们通过web管理台可以看到,重启服务之后数据都还在。

  •  第四步:启动消费者重新消费消息,控制台输出:消费者收到消息:这是一条持久化消息,验证成功。

RabbitMQ的消息确认机制与重回队列

RabbitMQ的消息状态

在我们上面验证消息持久化的截图中,在Messages中有两个状态:Ready 和 Unacked,但是要消费时设置为手动确认,下面分别对这两个状态做个简介。

Ready:按字面意思就是准备好了,可以投递给消费者了,对于未开启持久化的消息写入内存即为Ready状态;如果开启持久化了,则要持久化到磁盘之后才会变成Ready状态。

Unacked(Unacknowledged——未确认的):表示已经投递到消费者但是还没有收到消费者Ack确认时的消息状态。

验证方式:我们可以把上面持久化的代码示例稍作修改,在消费者调用channel.basicAck方法之前休眠个几十秒即可,然后查询消息的状态如下所示:可以看到Unacked状态的消息条数为1。

(9)RabbitMQ消息持久化、Ack确认与重回队列

消费者Ack机制与重回队列

为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制( message acknowledgement) 。我们上面的例子中其实已经用到了这点,通过channel.basicConsume(String queue, boolean autoAck, Consumer callback) 订阅消费队列上的消息时,第二个参数autoAck表示是否自动确认。当autoAck 设为true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除;当autoAck 设为false时, RabbitMQ 会等待消费者通过 basicAck(long deliveryTag, boolean multiple)方法,显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除) 。

当autoAck 参数置为false ,对于RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息,即上面介绍的Ready状态;一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息,即Unacked状态。如果RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者。可以通过消费者休眠,把消费者关掉然后再启动即可验证。

消费者确认:basicAck(long deliveryTag, boolean multiple),其中deliveryTag 可以看作消息的编号,它是一个64 位的长整型值;multiple一般设为false,如果设为true则表示确认当前deliveryTag 编号及之前所有未被当前消费者确认的消息。

消费者拒绝:basicNack(long deliveryTag, boolean multiple, boolean requeue),其中deliveryTag 可以看作消息的编号,它是一个64 位的长整型值。multiple一般设为false,如果设为true则表示拒绝当前deliveryTag 编号及之前所有未被当前消费者确认的消息。requeue参数表示是否重回队列,如果requeue 参数设置为true ,则RabbitMQ 会重新将这条消息存入队列尾部(注意是队列尾部),等待继续投递给订阅该队列的消费者,当然也可能是自己;如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

代码演示

我们发送消息的时候,传递一个消息的序号,在消费者处理的时候判断这个序号如果为1则拒绝,其他情况都确认接收,代码如下所示:

package cn.wkp.rabbitmq.newest.ack;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import cn.wkp.rabbitmq.util.ConnectionUtil;

public class Send {

    private final static String EXCHANGE_NAME = "ack_exchange";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明交换机-持久化,非自动删除
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true,false,null);

        // 消息内容
    	for(int i=0;i<4;i++) {
    		String message = "消息:"+i;
	    	BasicProperties props= MessageProperties.PERSISTENT_TEXT_PLAIN;
		channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
	    	System.out.println("Sent message:" + message);
    	}
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
package cn.wkp.rabbitmq.newest.ack;

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cn.wkp.rabbitmq.util.ConnectionUtil;

public class Recv {

	private final static String EXCHANGE_NAME = "ack_exchange";
	private final static String QUEUE_NAME = "ack_queue";

	public static void main(String[] argv) throws Exception {

		// 获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();

		// 声明交换机-持久化,非自动删除
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true,false,null);
		// 声明队列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                // 绑定队列到交换机
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		
		// 定义队列的消费者
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body);
				//消息序号
				int num=Integer.valueOf(msg.split(":")[1]);
				System.out.println("消费者收到消息:" + new String(body));
				try {
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				if(num==1) {
					System.out.println("消费者收到消息:" + new String(body)+",拒绝了这条消息");
					//requeue为true表示让消息重回队列,放入队列尾部,如果为false则会删除当前消息
					channel.basicNack(envelope.getDeliveryTag(), false, true);
				}else {
					System.out.println("消费者收到消息:" + new String(body)+",接收了这条消息");
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		// 监听队列,设置为手动确认
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

运行结果如下:可以看到消息1被拒绝之后,设置了重回队列,存到了队列的尾部;然后重新投递给消费者,又被拒绝,重回队列,如此循环往复。。。。。。

注意:如果设置了重回队列,因为某种原因消息无法被正确处理,就会一直重复循环重回队列——>消费拒绝——>重回队列,这样并不好,生产上不建议这样设置。如果要设置重回队列的话,要设置最大处理次数,例如为3,记录消费者处理失败的次数,当处理失败次数小于3调用Nack重回队列;如果达到了最大重试次数,则调用Ack删除消息,同时持久化该消息,后期采用定时任务或者手工处理。

你可以将requeue参数设为false,运行后可以看到被Nack的消息会被删除。

Sent message:消息:0
Sent message:消息:1
Sent message:消息:2
Sent message:消息:3
消费者收到消息:消息:0
消费者收到消息:消息:0,接收了这条消息
消费者收到消息:消息:1
消费者收到消息:消息:1,拒绝了这条消息
消费者收到消息:消息:2
消费者收到消息:消息:2,接收了这条消息
消费者收到消息:消息:3
消费者收到消息:消息:3,接收了这条消息
消费者收到消息:消息:1
消费者收到消息:消息:1,拒绝了这条消息
消费者收到消息:消息:1
消费者收到消息:消息:1,拒绝了这条消息
消费者收到消息:消息:1
消费者收到消息:消息:1,拒绝了这条消息
............

//消费1一直循环重回队列

本节介绍的Ack确认是消息消费者的确认,下一节会介绍一下消息生产者的确认,总共有两种方式:事务机制和confirm机制,希望继续关注。