Springboot集成RabbiMQ

  • 引入Jar包

这里使用的是gradle配置

compile('org.springframework.boot:spring-boot-starter-amqp')

  • 配置application.properties 

spring.rabbitmq.addresses = localhost:5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=gues

  • 配置rabbitConfig
  1. direct模式

       direct 类型的交换器路由规则很简单,它会把消息路由到那些 BindingKey RoutingKey

完全匹配的队列中。

/**
 * @ProjectName: payment
 * @Description: 消息队列配置类
 * @Author: zw
 */
@Configuration
public class RabbitConfig {
    @Bean
    Queue customQueue1() {
        return new Queue("customQueue1");
    }

    @Bean
    Queue customQueue2() {
        return new Queue("customQueue2");
    }

    @Bean
    Queue customQueue3() {
        return new Queue("customQueue3");
    }

    @Bean
    DirectExchange myDirectExchange() {
        return new DirectExchange("myDirectExchange");
    }

    @Bean
    Binding myBindingbuilder1() {
        return BindingBuilder.bind(customQueue1()).to(myDirectExchange()).with("myDirectRoutingKey");
    }

    @Bean
    Binding myBindingbuilder2() {
        return BindingBuilder.bind(customQueue2()).to(myDirectExchange()).with("myDirectRoutingKey");
    }

    @Bean
    Binding myBindingbuilder3() {
        return BindingBuilder.bind(customQueue3()).to(myDirectExchange()).with("test");
    }

}

 如上我配置了三个队列,将其中customQueue1和customQueue2绑定到myDirectExchange交换机并指定路由键为myDirectRoutingKey,将customQueue3绑定到myDirectExchange交换机并指定路由键为test

/**
 * author:zw
 */
@Service
public class RabbitMQSender {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendDirectMsg(Object msg) {
        rabbitTemplate.convertAndSend("myDirectExchange", "myDirectRoutingKey", msg);
    }
}

 如上 配置生产者类指定发送的交换机和路由键

/**
 * author:zw
 */
@Component
public class RabbitMQConsumer {

    @RabbitListener(queues = "customQueue1")
    public void process1(Message msg, Channel channel) throws IOException {
        System.out.println("进入customQueue1");
        if (null != msg && null != msg.getBody() && 0 != msg.getBody().length) {

        }
    }

    @RabbitListener(queues = "customQueue2")
    public void process2(Message msg, Channel channel) throws IOException {
        System.out.println("进入customQueue2");
        if (null != msg && null != msg.getBody() && 0 != msg.getBody().length) {

        }
    }

    @RabbitListener(queues = "customQueue3")
    public void process3(Message msg, Channel channel) throws IOException {
        System.out.println("进入customQueue3");
        if (null != msg && null != msg.getBody() && 0 != msg.getBody().length) {

        }
    }

}

如上分别监听三个队列的消息

Springboot集成RabbiMQ

如图是消息发送后的结果

由结果可见当发消息的时候设置路由键为myDirectRoutingKey,消息只会路由到customQueue1,customQueue2

2.fanout模式

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。 

@Bean
Queue customQueue1() {
    return new Queue("customQueue1");
}

@Bean
Queue customQueue2() {
    return new Queue("customQueue2");
}

@Bean
Queue customQueue3() {
    return new Queue("customQueue3");
}
@Bean
FanoutExchange myFanoutExchange(){
    return new FanoutExchange("myFanoutExchange");
}
@Bean
Binding myBindingbuilder1(){
    return BindingBuilder.bind(customQueue1()).to(myFanoutExchange());
}
@Bean
Binding myBindingbuilder2(){
    return BindingBuilder.bind(customQueue2()).to(myFanoutExchange());
}

如上,我们先注释了之前direct模式配置的关系代码将customQueue1和customQueue2重新绑定到myFanoutExchange

public void sendDirectMsg(Object msg){
    rabbitTemplate.convertAndSend("myFanoutExchange","",msg);
}

如上更改发送交换机为myFanoutExchange 消费端代码不变

Springboot集成RabbiMQ

如上图可见绑定了myFanoutExchange的队列都可以收到消息,因此customQueue3没有收到消息

3.topic模式

       与direct类型的交换器路由规则的完全匹配BindingKey和RoutingKey不同,topic类型的交换器在匹配规则上进行了扩展,,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

BindingKey 中可以存在两种特殊字符串"*"和"#",用于做模糊匹配,其中"*"用于匹配一个单词,”#”用于匹配多规格单词(可以是零个)。

@Bean
Queue customQueue1() {
    return new Queue("customQueue1");
}
@Bean
Queue customQueue2() {
    return new Queue("customQueue2");
}
@Bean
Queue customQueue3() {
    return new Queue("customQueue3");
}
@Bean
TopicExchange myTopicExchange(){
    return new TopicExchange("myTopicExchange");
}
@Bean
Binding myBindingbuilder1(){
    return BindingBuilder.bind(customQueue1()).to(myTopicExchange()).with("org.zhang.routing.key");
}
@Bean
Binding myBindingbuilder2(){
    return BindingBuilder.bind(customQueue2()).to(myTopicExchange()).with("org.zhang.#");
}
@Bean
Binding myBindingbuilder3(){
    return BindingBuilder.bind(customQueue3()).to(myTopicExchange()).with("*.zhang.*.key");
}

如图新建一个topic模式的交换机 将三个队列重新通过三种不同的routingKey进行绑定,其中2,3又通过使用“#”和“*”来做为一个模糊匹配

修改发送端如下


public void sendDirectMsg(Object msg){
    rabbitTemplate.convertAndSend("myTopicExchange","org.zhang.routing.key",msg);
}

查看结果

Springboot集成RabbiMQ

由图可见,三个队列同时收到了此消息,也就可以区分“#”和“*”的区别以及topic的扩展表现。

4.Headers

        headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容的headers 属性进行匹配。在绑定队列和交换器时制定一组键值对 当发送消息到交换器时,RabbitM 会获取到该消息的 headers (也是一个键值对的形式) ,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

 

rabbitMQ运转流程

在最初状态下,生产者发送消息的时候

  1. 生产者连接到 RabbitMQ Broker 建立一个连接( Connection) ,开启 个信道
  2.  生产者声明一个交换器 ,并设置相关属性,比如交换机类型、是否持久化等

(3)生产者声明个队列井设置相关属性,比如是否排他、是否持久化、是否自动删详细内容

生产者通过路由键将交换器和队列绑定起来生产者发送消息至 RabbitMQ Broker ,其中包含路由键、交换器等信息。

(6) 相应的交换器根据接收到的路由键查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中。

(8) 如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者。

(9) 关闭信道。

(1 0) 关闭连接。

 

消费者接收消息的过程:

(1)消费者连接到 RabbitMQ Broker ,建立一个连接Connection ,开启 个信道Channel

(2) 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,

以及做 些准备工作。

(3)等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。

(4) 消费者确认 ack) 接收到的消息

( 5) RabbitMQ 从队列中删除相应己经被确认的消息

6) 关闭信道。

7) 关闭连接。

 

这里存在两个概念,Connection和Channel,无论是生产者还是消费者都要和RabbitMQ Broker建立连接,这里连接就是一个TCP连接,也就是Connection.一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道--Channel,每个信道都会被指派一个唯一ID,信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

Springboot集成RabbiMQ

这里之所以存在channel信道,是因为如果一个应用程序中存在多个线程需要从RabbiMQ中消费消息,或生产消息,那么必然需要建立很多个Connection,也就是许多个TCP连接。

而这对于操作系统而言建立和销毁TCP连接是非常消耗资源的,当MQ被频繁使用,会很快出现性能瓶颈,而RabbitMQ采用类似NIO的做法,选择TCP连接复用,可以减少性能开销,同时也便于管理。

 

  • rabbitMQ消息确认机制

 

这里只介绍confirm模式,事务模式就不演示了

 

rabbitmq消息确认 分为 生产者确认 和 消费者确认 两者不耦合

  1. 发布端确认

配置参数

#消息发送到交换机确认机制,是否确认回调
spring.rabbitmq.publisher-returns=true
#消息发送到交换机确认机制,是否返回回调
spring.rabbitmq.publisher-confirms=true

配置一个队列


@Bean
Queue  myQueue(){
    return new Queue("myQueue");
}
@Bean
DirectExchange myDirectExchange(){
    return new DirectExchange("myDirectExchange");
}
@Bean
Binding myBinding(){
    return BindingBuilder.bind(myQueue()).to(myDirectExchange()).with("my.routing.key");
}

配置发送类 注意这里需要实现两个回调类并设置一下template

/**
 * author:zw
 */
@Component
public class RabbitMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    private static final Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);
    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void sendDirectMsg(Object msg){
        CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());
        System.out.println("消息id" + correlationData.getId());
         rabbitTemplate.convertAndSend("myDirectExchange","my.routing.key",msg,correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("发布消息确认-消息id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送确认成功");
        } else {
            System.out.println("消息发送确认失败:" + cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
                + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
    }

 

2.消费端确认

#监听异常重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
#是否开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms

#是否开启拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#确认机制 自动 手动
spring.rabbitmq.listener.simple.acknowledge-mode=manual

/**
 * author:zw
 */
@Component
public class RabbitMQConsumer {

    @RabbitListener(queues = "myQueue")
    public void process(Message msg, Channel channel) throws IOException {
        System.out.println("进入myQueue");
        if (null != msg && null != msg.getBody() && 0 != msg.getBody().length) 
//这里介绍三个参数的含义
            //msg.getMessageProperties().getDeliveryTag()  当前消息的tagId
            //multiple true 确认之前收到的消息,false 只确认当前消息
            //requeue true 重新回到队列 false 不回到队列
           HashMap<String, String> map = (HashMap<String, String>) SerializationUtils.deserialize(msg.getBody());
           if("20".equals(map.get("age"))){
               channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);//消息确认
               System.out.println("消息确认");
           }else if("25".equals(map.get("age"))){
               channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);// 消息取消确认
               System.out.println("消息不确认");
           }else{
               channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);//消息拒绝 返回队列
               System.out.println("消息拒绝");
           }
        }
    }

运行一下查看一下结果

第一次传入age=20的消息 保证发布端和消费端都确认成功

Springboot集成RabbiMQ

由截图可见 发送端已经确认消息发到交换机并且成功路由到匹配队列

消费端进入消费并手动确认消费

 

第二次 修改发送端发送的交换机名称为Mq服务器里不存在的交换机名然后查看结果

修改信息如下

Springboot集成RabbiMQ

结果如下

Springboot集成RabbiMQ

第三次 修改发送端路由为没有匹配队列的路由名

修改如下

Springboot集成RabbiMQ

结果如下

Springboot集成RabbiMQ

由图可见消息被确认发送到了交换机但因没有找到匹配路由键的队列所以消息被退回

 

第四次 观察消费端不确认消息并开启重返队列效果

修改配置如下

Springboot集成RabbiMQ

传入age=25 结果如下

Springboot集成RabbiMQ

由图可见如果一直不做处理的情况下,队列被不确认后会一直重新回到队列被消费

 

第五次 测试拒绝

修改代码如下

Springboot集成RabbiMQ

 

增加一个拒绝条件判断 当一个消息被重复消费超过10次消息拒绝

Springboot集成RabbiMQ

以上就是RabbitMQ的confirm确认机制

 

  • rabbitMQ 配置死信队列
/**
 * @Description: 消息队列配置类
 * @Author: zw
 */
@Configuration
public class RabbitConfig {
    /**
     * 死信队列跟交换机类型没有关系  不影响该类型交换机的特性.
     */
    @Bean
    DirectExchange deadLetterExchange(){
        return new DirectExchange("Dead_Letter_Exchange");
    }
    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应死信交换机转发交换机名
     * x-dead-letter-routing-key  对应死信队列转发路由名
     */
  @Bean
    Queue deadLetterQueue(){
        Map<String, Object> map = new HashMap<String, Object>(2);
        map.put("x-dead-letter-exchange","Dead_Letter_Exchange");
        map.put("x-dead-letter-routing-key","redirect.routing.key");
        return new Queue("Dead_Letter_Queue",true,false,false,map);
    }


    /**
     * 定义死信队列转发队列.
     */
    @Bean
    public Queue redirectQueue() {
        return new Queue("REDIRECT_QUEUE");
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     */
    @Bean
    public Binding deadLetterBinding(){
        return new Binding("Dead_Letter_Queue",Binding.DestinationType.QUEUE,"Dead_Letter_Exchange","dead.letter.routing.key",null);
    }

    @Bean
    public Binding redirectBinding(){
        return new Binding("REDIRECT_QUEUE",Binding.DestinationType.QUEUE,"Dead_Letter_Exchange","redirect.routing.key",null);
    }

 

  1. 申明一个交换机
  2. 配置一个死信队列,并设置死信队列过期消息进行转发的交换机参数和路由键
  3. 将死信队列与交换机进行绑定
  4. 申明一个作为死信队列转发消息的队列
  5. 将转发队列与死信队列之前设置参数的交换机和路由键进行绑定

测试:

@RabbitListener(queues = "REDIRECT_QUEUE")
public void process(Message msg, Channel channel) throws IOException {
    System.out.println("进入myQueue时间为"+new Date().toLocaleString());
public void sendDirectMsg(Object msg){
    CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());
    System.out.println("消息id" + correlationData.getId());
    MessagePostProcessor messagePostProcessor = message -> {
        MessageProperties messageProperties = message.getMessageProperties();
        //设置编码
        messageProperties.setContentEncoding("utf-8");
        //设置过期时间10*1000毫秒
        messageProperties.setExpiration("10000");
        return message;
    };
    rabbitTemplate.convertAndSend("Dead_Letter_Exchange","dead.letter.routing.key",msg,messagePostProcessor,correlationData);
}

Springboot集成RabbiMQ

 

Springboot集成RabbiMQ

 

Springboot集成RabbiMQ

 

Springboot集成RabbiMQ

由图可见,消息一开始被路由到死信队列,因为没有消费者监听所以当消息的过期时间到了之后被私信队列投递路由到之前设置好的转发队列。