SpringBoot+RabbitMQ实现延迟队列

Rabbitmq实现延时队列一般而言有两种形式:

       方式一:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)

       方式二:利用rabbitmq中的插件x-delay-message

方式一:

       此方式使用的是direct模式的Exchange交换机。

原理图:

SpringBoot+RabbitMQ实现延迟队列

原理图解释:

       1、声明延迟队列(普通队列声明方式),在声明时设置x-dead-letter-exchange和x-dead-letter-routing-key两个属性(具体含义参见下面代码中的解释);

       2、绑定延迟队列到交换机DEAD_LETTER_EXCHANGE(普通交换机绑定方式,不要被死信交换机这个名字忽悠了),注意,要指定路由键DELAY_ROUTING_KEY

       3、在消息生产者中向指定交换机DEAD_LETTER_EXCHANGE和指定路由键DELAY_ROUTING_KEY发送消息,消息会被发送到延迟队列DELAY_QUEUE上;

       4、注意,延迟队列DELAY_QUEUE没有消费者接收的,即在消费者中不要监听此延迟队列。如果延迟队列DELAY_QUEUE有消费者的话,那么此队列中的消息就会被立即消费,而不会因为设置的过期时间到期后过期,也就不会成为死信(Dead Letter),那更不会再向绑定的死信交换机发送信息--也就不会路由到新的队列。

       5、在延迟队列中的消息到了过期时间之后,因为设置了x-dead-letter-exchange和x-dead-letter-routing-key两个属性,队列中的消息会变成死信消息,之后会根据绑定的死信交换机和路由键将此死信消息重新发送到指定的交换机IMMEDIATE_EXCHANGE到达指定队列IMMEDIATE_QUEUE,消费者中实际监听的是最后这个IMMEDIATE_QUEUE队列,至此,实现了延迟队列的功能。

方式一常量配置:

package com.shengquan.config;
/**
 *
@author:shengquan
 
* @Date:2019/10/11 17:30
 */

public class RabbitMQDelayConstants {
   
public static final String IMMEDIATE_QUEUE = "immediate_queue";                      //立即消费队列
    public static final String DELAY_QUEUE = "delay_queue";                              //延迟队列
    public static final String IMMEDIATE_EXCHANGE = "immediate_exchange";
   
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";            //死信交换机
    public static final String IMMEDIATE_ROUTING_KEY = "immediate_routing_key";          //立即消费路由键
    public static final String DELAY_ROUTING_KEY = "delay_routing_key";                  //延迟路由键
}

方式一配置类:

队列声明、交换机声明、队列与交换机的绑定配置类:

package com.shengquan.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 *
延迟队列实现方式1配置
 * @author:shengquan
 
* @Date:2019/10/12 9:11
 */

@Configuration
public class RabbitMQDelayConfigOne {
   
/* 1、队列声明 */
   
@Bean   // 创建一个立即消费队列
    public Queue immediateQueue() {
       
return new Queue(RabbitMQDelayConstants.IMMEDIATE_QUEUE, true);
   
}
   
@Bean   // 创建一个延迟队列, 此队列中的消息没有消费者去消费, 到了过期时间之后变成死信, 变死信之后会根据
            // 绑定的DLXroutingKey重新发送到指定交换机再到指定队列。
    public Queue delayQueue() {
       
Map<String, Object> params = new HashMap<>();
       
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", RabbitMQDelayConstants.IMMEDIATE_EXCHANGE);
       
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RabbitMQDelayConstants.IMMEDIATE_ROUTING_KEY);
       
return new Queue(RabbitMQDelayConstants.DELAY_QUEUE, true, false, false, params);
   
}
   
/* 2、声明交换机 */
   
@Bean   //声明立即消费队列交换机-direct类型
    public DirectExchange immediateExchange() {
       
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
        //第三种在第二种参数上可以增加MapMap中可以存放自定义exchange中的参数
        return new DirectExchange(RabbitMQDelayConstants.IMMEDIATE_EXCHANGE, true, false);
   
}
   
@Bean   //声明死信队列交换机-direct类型
    public DirectExchange deadLetterExchange() {
       
return new DirectExchange(RabbitMQDelayConstants.DEAD_LETTER_EXCHANGE, true, false);
   
}
   
/* 3、绑定队列与交换机 */
   
@Bean   //把立即消费的队列和立即消费交换机绑定, immediate_exchange, 路由键:immediate_routing_key
   
public Binding immediateBinding() {
       
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(RabbitMQDelayConstants.IMMEDIATE_ROUTING_KEY);
   
}
   
@Bean   //把延迟消费的队列和死信交换机绑定, immediate_dead_exchange, 路由键:delay_routing_key
   
public Binding delayBinding() {
       
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(RabbitMQDelayConstants.DELAY_ROUTING_KEY);
   
}
}

注意:

       这个配置中声明的延迟队列DELAY_QUEUE,是没有消费者接收的。如果延迟队列DELAY_QUEUE有消费者的话,那么此队列中的消息就会被立即消费,而不会因为设置的过期时间到期后过期,也就不会成为死信(Dead Letter),那更不会再向绑定的死信交换机发送信息--也就不会路由到新的队列。

方式一生产者:

package com.shengquan.sender;
import com.shengquan.config.RabbitMQDelayConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
 *
延迟队列方式1-消息发送者
 * @author:shengquan
 
* @Date:2019/10/12 11:05
 */

/*@Component*/
@Service
@Slf4j

public class DelaySenderOne {
   
@Autowired
   
private RabbitTemplate rabbitTemplate;

   
/**
     *
向延迟队列发送消息
     * 注意这里设置的是setExpiration,而不是delay
     */
   
public void send(String msg, int delayTime) {
       
log.info("**delayTime" + delayTime);
       
this.rabbitTemplate.convertAndSend(
               
RabbitMQDelayConstants.DEAD_LETTER_EXCHANGE,
               
RabbitMQDelayConstants.DELAY_ROUTING_KEY,
               
msg,
               
message -> {
                    message
.getMessageProperties().setExpiration(String.valueOf(delayTime));
                   
log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " Delay1 sent.");
                   
return message;
               
});
   
}
}

方式一消费者:

package com.shengquan.reciever;
import com.rabbitmq.client.Channel;
import com.shengquan.config.RabbitMQDelayConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 *
延迟队列实现方式1-消息消费者
 * @author:shengquan
 
* @Date:2019/10/12 11:12
 */

@Component
@Slf4j

public class DelayRecieverOne {
   
@RabbitListener(queues = RabbitMQDelayConstants.IMMEDIATE_QUEUE)
   
public void directReciever1(Object object, String msg, Channel channel,
                               
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
       
try {
           
log.info("delay-IMMEDIATE_QUEUE-msg" + msg);
           
channel.basicAck(deliveryTag, false);
       
} catch (IOException e) {
            e
.printStackTrace();
           
try {
               
log.info("delay-IMMEDIATE_QUEUE-msg消息重回队列======");
               
channel.basicNack(deliveryTag, false, true);
           
} catch (IOException e1) {
                e1
.printStackTrace();
           
}
        }
    }

}

方式一总结:

       经过测试,我们可以发现,当我们先增加一条过期时间大(10000)的A消息进入,之后再增加一个过期时间小的(5000)消息B,并没有出现想象中的B消息先被消费,A消息后被消费,而是出现了当10000过去的时候,AB消息同时被消费,也就是B消息的消费被阻塞了。

       为什么会出现这样的现象呢?

       我们知道利用TTL DLX特性实现的方式,实际上在第一个延时队列C里面设置了dlx,生产者生产了一条带ttl的消息放入了延时队列C中,等到延时时间到了,延时队列C中的消息变成了死信,根据延时队列C中设置的dlx的exchange的转发规则,转发到了实际消费队列D中,当该队列中的监听器监听到消息时就会正式开始消费。那么实际上延时队列中的消息也是放入队列中的,队列满足先进先出而延时大的消息A还没出队,所以B消息也不能顺利出队。