rabbitmq_集成到springboot

本例不止是使用rabbitmq,还将它封装到一个jar工程中,第三方发mq消息时只需要使用它提供的一个方法就行了,使用非常方便。

最终的项目结构如下:

 rabbitmq_集成到springboot

1. 创建一个子项目:

 rabbitmq_集成到springboot

创建完成后,在pom.xml中添加一段:

<parent>

  <groupId>com.example</groupId>

  <artifactId>apartment-parent</artifactId>

  <version>0.0.1-SNAPSHOT</version>

</parent>

删除原始的parent标签。

同时,在父项目的pom.xml中添加一段配置:

<modules>

<module>apartment-rabbitmq</module>

</modules>


2. 添加依赖

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

3. 准备环境

访问:http://ip:15672

guest/guest

添加虚拟主机:

apartment

添加用户:

aptadmin/aptadmin,并挂到apartment主机下

创建两个队列:

apartment.email.queue

apartment.sms.queue

创建交换机:

apartment.email.exchange

并绑定apartment.email.queue队列,routing key为:apartment.email.routinkey

 sms同理.

4. 添加配置

由于是用做第三方包使用,因此将application.properties改名为rabbitmq.properties

spring.rabbitmq.host=ip

spring.rabbitmq.port=5672

spring.rabbitmq.username=aptadmin

spring.rabbitmq.password=aptadmin

spring.rabbitmq.publisher-confirms=true

spring.rabbitmq.virtual-host=apartment

 

spring.rabbitmq.sms.exchange=apartment.sms.exchange

spring.rabbitmq.sms.queue=apartment.sms.queue

spring.rabbitmq.sms.routinKey=apartment.sms.routinkey

 

spring.rabbitmq.email.exchange=apartment.email.exchange

spring.rabbitmq.email.queue=apartment.email.queue

spring.rabbitmq.email.routinKey=apartment.email.routinkey

 

5. Rqbbimq配置

package com.rabbitmq.config;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.PropertySource;

import org.springframework.messaging.converter.GenericMessageConverter;

import lombok.Getter;

@Configuration  

@PropertySource("classpath:rabbitmq.properties")

@Getter

public class RabbitmqConfig {

//@Value("${spring.rabbitmq.host}")  

//    public String host;  

//    @Value("${spring.rabbitmq.port}")  

//    public int port;  

//    @Value("${spring.rabbitmq.username}")  

//    public String username;  

//    @Value("${spring.rabbitmq.password}")  

//    public String password;  

//    @Value("${spring.rabbitmq.virtual-host}")  

//    public String virtualHost;  

//  

//    /**  

//        创建连接工厂  

//    */  

//    @Bean  

//    public CachingConnectionFactory connectionFactory() {  

//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.host, this.port);  

//        connectionFactory.setUsername(this.username);  

//        connectionFactory.setPassword(this.password);  

//        connectionFactory.setVirtualHost(this.virtualHost);  

//        connectionFactory.setPublisherConfirms(true);  

//        return connectionFactory;  

//    }  

///**

// * 创建 RabbitAdmin

// */

//@Bean

//public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory)

//throws Exception {

//RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

//return rabbitAdmin;

//}

//

///**

// * 初始化 RabbitMessagingTemplate

// */

//@Bean

//public RabbitMessagingTemplate rabbitMessagingTemplate(

//ConnectionFactory connectionFactory) {

//RabbitTemplate template = new RabbitTemplate(connectionFactory);

//RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();

//rabbitMessagingTemplate

//.setMessageConverter(new GenericMessageConverter());

//rabbitMessagingTemplate.setRabbitTemplate(template);

//return rabbitMessagingTemplate;

//}

}

此配置文件需要重点说一下:

1. 由于是作为第三方包使用,可以有两个选择,

第一:使用jar包自身的配置文件配置mq地址,即jar固定了mq地址,这样的话就需要修改配置文件名,并在配置文件添加propertysource注解,如本例。

第二:主项目使用自己的配置文件,那本工程的配置文件不需要配置,而且这里也不用加注解。

2. 上面注释掉的几个beanspringboot都有默认实现,因此可以不写,如果非得自定义的话可以放开了进行修改。

 

6. 各队列配置

本例中写了两个队列,因此需要两个configuration配置类:

package com.rabbitmq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import lombok.Getter;

@Configuration

@Getter

public class EmailQueueConfig {

@Value("${spring.rabbitmq.email.exchange}")

String emailExchange;

@Value("${spring.rabbitmq.email.queue}")

String emailQueue;

@Value("${spring.rabbitmq.email.routinKey}")

String emailRoutingkey;

@Bean

TopicExchange emailTopicExchange() {

return new TopicExchange(emailExchange, true, false);

}

 

@Bean

Queue emailQueue() {

 /**  

         durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列  

         auto-delete 表示消息队列没有在使用时将被自动删除 默认是false  

         exclusive  表示该消息队列是否只在当前connection生效,默认是false

         */  

return new Queue(emailQueue, true, false, false);

}

 

@Bean

//  将消息队列emailQueue和交换机进行绑定  

Binding emailBinding() {

return BindingBuilder.bind(emailQueue()).to(emailTopicExchange()).with(emailRoutingkey);

}

}

 

package com.rabbitmq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import lombok.Getter;

@Configuration

@Getter

public class SmsQueueConfig {

@Value("${spring.rabbitmq.sms.exchange}")

String smsExchange;

@Value("${spring.rabbitmq.sms.queue}")

String smsQueue;

@Value("${spring.rabbitmq.sms.routinKey}")

String smsRoutingkey;

 

@Bean

TopicExchange smsTopicExchange() {

return new TopicExchange(smsExchange, true, false);

}

 

@Bean

Queue smsQueue() {

return new Queue(smsQueue, true, false, false);

}

 

@Bean

//  将消息队列emailQueue和交换机进行绑定  

Binding smsBinding() {

return BindingBuilder.bind(smsQueue()).to(smsTopicExchange()).with(smsRoutingkey);

}

}

 

7. 生产者

两个队列两个生产者:

package com.rabbitmq.producer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.rabbitmq.config.EmailQueueConfig;

@Service

public class EmailMQProducer {

private Log log = LogFactory.getLog(this.getClass());

@Autowired

EmailQueueConfig emailQueueConfig;

    @Autowired  

    private RabbitTemplate rabbitTemplate;  

    

public void send(String message) {

log.info("send start.....");

rabbitTemplate.convertAndSend(

emailQueueConfig.getEmailExchange(),

emailQueueConfig.getEmailRoutingkey(),

message);

}

}

 

package com.rabbitmq.producer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.rabbitmq.config.SmsQueueConfig;

@Service

public class SmsMQProducer {

private Log log = LogFactory.getLog(this.getClass());

@Autowired

SmsQueueConfig smsQueueConfig;

    @Autowired  

    private RabbitTemplate rabbitTemplate;  

    

public void send(String message) {

log.info("send start.....");

rabbitTemplate.convertAndSend(

smsQueueConfig.getSmsExchange(),

smsQueueConfig.getSmsRoutingkey(),

message);

}

}

 

8. 测试

在另一项目中,引入此jar包:

<dependency>

<groupId>com.example</groupId>

<artifactId>apartment-rabbitmq</artifactId>

<version>0.0.1-SNAPSHOT</version>

</dependency>

因为是同一父项目下的子项目,因此可以直接引,否则需要通过私服。

添加controller测试:

@Controller

@Api(description = "消息队列")

public class RabbitmqController {

private static final Logger logger = LoggerFactory.getLogger(RabbitmqController.class);

    @Resource

    private EmailMQProducer emailMQProducer;

    @Resource

    private SmsMQProducer smsMQProducer;

    

@RequestMapping(value = "/sendemail", method = RequestMethod.GET)

public String sendEmail(){

String message = "email message 1";

emailMQProducer.send(message); 

return "home";

}

@RequestMapping(value = "/sendsms", method = RequestMethod.GET)

public String sendSms(){

String message = "sms message 1";

smsMQProducer.send(message); 

return "home";

}

}

执行:http://localhost:8080/sendsms

可以看到mq中有消息了

  

9. 消费者

由于此工程为jar包为第三方提供服务,因此消费者写到主项目自己项目中才合理,在本项目中写了一个demo,使用时只需要将demo复制到主项目中即可使用,或者主项目中自己写一个类也很简单:

package com.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

 *

 * 此类为一个mq消费者的示例demo,在此jar包中无效,可直接拷贝到主项目中使用。

 *

 */

@Component

@RabbitListener(queues = "apartment.email.queue")

public class ConsumerDemo{

    @RabbitHandler

    public void process(String hello) {

        System.out.println("Receiver  : " + hello);

    }

}

放到主项目中后,启动主项目,上面三条消息逐一被打印,队列中也消失了。