Spring boot集成RabbitMQ的简单使用

最近消息队列的使用比较频繁,目前我使用比较多的就是RabbitMQ了,在项目中一般使用消息队列的场景有如下几个地方。

1.异步的处理:比如在注册,或者项目中状态改变需要给对应的角色发送邮件,短信的时候。应该采用消息队列把事件放入队列,让发送邮件的服务去做发送的事件。

2.应用解耦:比如订单和库存的系统中,原来是有了订单就会通知库存发生改变。如果库存发生未知错误,那么订单也会失败。这样是不大合理的。现在是将订单系统和库存系统分离。将订单消息持久化到消息队列中,就算库存系统失效,也能根据持久化的消息来保持系统的完整性。

3.日志系统:日志直接写入数据库,当有大量数据的时候,数据库压力过大。可以采用Kalfka消息中间件处理。

目前RabbitMQ的抽象消息模型如下:

生产者(producer)创建消息,消费者(consumer)订阅某个队列。然后发布到队列(queue)中,最后将消息发送到监听的消费者。

Spring boot集成RabbitMQ的简单使用

上面只是一个抽象的模型,由于RabbitMQ实现了AMQP 协议,所以实际上也是AMQP的模型。多了一个exchange,生产者的消息不会直接发送给队列,而是经过exchange去分发给队列。

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

RabbitMQ常用的Exchange Type有三种:fanout、direct、topic:

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。

下面我们直接代码实践一下:

先编写客户端,如果还没有安装RabbitMQ,可以先安装,我是安装在ubuntu上的。可以看我的这篇文章。https://blog.****.net/qq_32532321/article/details/82862485

1.先在IDEA创建一个新的spring boot项目

2.配置依赖

<!-- RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

3.在配置文件中添加配置信息,我使用的yml文件。

spring:
 rabbitmq:
    host: (rabbitmq的主机ip)
    port: 5672(端口,默认没有修改就是5672)
    username: (账号)
    password: (密码)
    virtual-host: /        (默认就是/)
    publisher-confirms: true

4.新建java的上生产者配置项

@Configuration
public class RqProducerConfig {
    @Bean
    RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    
    //声明一个队列 队列名queue.string
    @Bean
    Queue queueString(RabbitAdmin rabbitAdmin) {
        Queue queue = new Queue("queue.string", true);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }

   
    //声明一个exchange 是topic形式的
    @Bean
    TopicExchange exchange(RabbitAdmin rabbitAdmin) {
        TopicExchange topicExchange = new TopicExchange("exchange");
        rabbitAdmin.declareExchange(topicExchange);
        return topicExchange;
    }

    
    //把exchange和队列绑定起来,生产者发送消息到exchange,exchange会将消息发送到对应的队列
    @Bean
    Binding bindingExchangeString(Queue queueString, TopicExchange exchange, RabbitAdmin rabbitAdmin) {
        Binding binding = BindingBuilder.bind(queueString).to(exchange).with("queue.string");
        rabbitAdmin.declareBinding(binding);
        return binding;
    }


    /**
     * 生产者用 可以用来转化为json
     *
     * @return
     */
    @Bean
    public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
        RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
        rabbitMessagingTemplate.setMessageConverter(jackson2Converter());
        rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
        return rabbitMessagingTemplate;
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        return new MappingJackson2MessageConverter();
    }
}

5.编写发送的服务层接口和实现

public interface SendStringRqService {
    void sendString(String message);
}
@Service
public class SendStringRqServiceImpl implements SendStringRqService {
   
    @Autowired
    private RabbitMessagingTemplate rabbitTemplate;

    @Override
    public void sendEmail(String message) {
          rabbitTemplate.convertAndSend("exchange", "queue.string", message);
    }

}

6.客户端测试类编写

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoApplicationTests {


    @Autowired
    private SendStringRqService sendStringRqService;

    @Test
    public void SendStringService() {
                sendStringRqService.sendEmail(new Date(System.currentTimeMillis())+" ------> hello world");
    }
}

我们再编写消费端代码

1.首先配置依赖和yml文件。这里跟客户端配置一样。可以直接看客户端配置就好了。

2.java配置项

@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {
    //这里是接受消息,进行处理的类
    @Autowired
    ReceiverService receiverService;

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // factory.setPrefetchCount(5);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

 

3.最后就是接受配置的类的编写,ReceiverService的编写

@Component
public class ReceiverService {


    //监听队列的名称,也就是客户端声明的queue.string
    @RabbitListener(queues = "queue.string")
    public void receiveStringQueue(String message) {
        System.out.println("Received string<" + message + ">");
        System.out.println("Received string<" + new Date(System.currentTimeMillis()) + ">");
       
    }
}

配置好之后直接运行消费者端代码。这个时候查看RabbitMQManage的界面,会有个连接可以看到。

Spring boot集成RabbitMQ的简单使用再运行客户端的测试用例

Spring boot集成RabbitMQ的简单使用

执行成功后看到消费者端的控制台输出:

Spring boot集成RabbitMQ的简单使用

这样基础的rabbitmq就用spring boot整合完成了。如果需要更多功能,可以根据这个示例去完成。