rabbitmq的消费限流的详细讲解和代码演示

什么是消费端的限流?

假设一个场景,首先,我们rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
    巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多的数据。
    rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
    使用 basicqos方法。在消费端进行使用。 0 1 false
    prefetSize:0
    prefetCount:这个值一般在设置为非自动ack的情况下生效,一般大小为1
    global: true是channel级别, false是消费者级别
注意:我们要使用非自动ack

代码演示

生产者

package com.mq.rabbit.limit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author Mqs
 * @date 2018/10/27 23:53
 * @desc
 */
public class Producer {
    public static void main(String[] args) throws Exception{
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setHost("192.168.200.130");
        factory.setUsername("mqs");
        factory.setPassword("mqs123");
        // 2、创建连接
        Connection connection = factory.newConnection();
        // 3、获取通道
        Channel channel = connection.createChannel();
        // 4、声明
        String exchangeName = "limit_exchange";
        String routingKey = "limit.key";

        String msg = "send message test limit mandatory ";

        // 5、生产者发送消息
        for (int i = 0; i < 6; i++){
            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
        }



    }
}

消费者

package com.mq.rabbit.limit;

import com.rabbitmq.client.*;

/**
 * @author Mqs
 * @date 2018/10/27 23:53
 * @desc
 */
public class Consumer {
    public static void main(String[] args)throws Exception {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setHost("192.168.200.130");
        factory.setUsername("mqs");
        factory.setPassword("mqs123");
        // 2、创建连接
        Connection connection = factory.newConnection();
        // 3、获取通道
        Channel channel = connection.createChannel();
        // 4、声明
        String exchangeName = "limit_exchange";
        String routingKey = "limit.key";
        String exchangeType = "direct";
        String queueName = "limit_queue";
        // 5、声明一个交换器
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
        // 6、声明一个队列
        channel.queueDeclare(queueName, true, false, false, null);
        // 7、绑定队列到交换器
        channel.queueBind(queueName, exchangeName, routingKey);
        // TODO 限流的设置
        // TODO 参数一: 消息的大小不做任何限制 参数二: 服务器给的最大的消息数,这里是一条一条的消费  参数三: 级别为consumer
        // TODO prefetCount 为 1, 一次消费一条消息,如果消费者没有确认消费,将不会接受生产者给的消息
        channel.basicQos(0, 1, false);
        // 8、消费者,要想做限流必须将自动ack设置为false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }
}

自定义的消费者

package com.mq.rabbit.limit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * @Author Mqs
 * @Date 2018/10/27 23:47
 * @Desc 自定义的消费者
 */
public class MyConsumer extends DefaultConsumer {

    private Channel channel;
    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("==handle message====");
        System.out.println("==consumerTag: " + consumerTag);
        System.out.println("==envelope: " + envelope);
        System.out.println("==properties: " + properties);
        System.out.println("==body: " + new String(body));

        // TODO 第二个参数设置为false,因为消费者设置的为prefetCount1
//        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

将消费端的确认给注释以后的结果rabbitmq的消费限流的详细讲解和代码演示

rabbitmq的消费限流的详细讲解和代码演示

将代码放开结果演示:

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("==handle message====");
        System.out.println("==consumerTag: " + consumerTag);
        System.out.println("==envelope: " + envelope);
        System.out.println("==properties: " + properties);
        System.out.println("==body: " + new String(body));

        // TODO 第二个参数设置为false,因为消费者设置的为prefetCount1
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

结果:

rabbitmq的消费限流的详细讲解和代码演示

rabbitmq的消费限流的详细讲解和代码演示