rabbitmq_集成到springboot
本例不止是使用rabbitmq,还将它封装到一个jar工程中,第三方发mq消息时只需要使用它提供的一个方法就行了,使用非常方便。
最终的项目结构如下:
1. 创建一个子项目:
创建完成后,在pom.xml中添加一段:
<parent>
<groupId>com.example</groupId>
<artifactId>apartment-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
删除原始的parent标签。
同时,在父项目的pom.xml中添加一段配置:
<modules>
<module>apartment-rabbitmq</module>
</modules>
2. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3. 准备环境
guest/guest
添加虚拟主机:
apartment
添加用户:
aptadmin/aptadmin,并挂到apartment主机下
创建两个队列:
apartment.email.queue
apartment.sms.queue
创建交换机:
apartment.email.exchange
并绑定apartment.email.queue队列,routing key为:apartment.email.routinkey
sms同理.
4. 添加配置
由于是用做第三方包使用,因此将application.properties改名为rabbitmq.properties:
spring.rabbitmq.host=ip
spring.rabbitmq.port=5672
spring.rabbitmq.username=aptadmin
spring.rabbitmq.password=aptadmin
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=apartment
spring.rabbitmq.sms.exchange=apartment.sms.exchange
spring.rabbitmq.sms.queue=apartment.sms.queue
spring.rabbitmq.sms.routinKey=apartment.sms.routinkey
spring.rabbitmq.email.exchange=apartment.email.exchange
spring.rabbitmq.email.queue=apartment.email.queue
spring.rabbitmq.email.routinKey=apartment.email.routinkey
5. Rqbbimq配置
package com.rabbitmq.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.messaging.converter.GenericMessageConverter;
import lombok.Getter;
@Configuration
@PropertySource("classpath:rabbitmq.properties")
@Getter
public class RabbitmqConfig {
//@Value("${spring.rabbitmq.host}")
// public String host;
// @Value("${spring.rabbitmq.port}")
// public int port;
// @Value("${spring.rabbitmq.username}")
// public String username;
// @Value("${spring.rabbitmq.password}")
// public String password;
// @Value("${spring.rabbitmq.virtual-host}")
// public String virtualHost;
//
// /**
// 创建连接工厂
// */
// @Bean
// public CachingConnectionFactory connectionFactory() {
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.host, this.port);
// connectionFactory.setUsername(this.username);
// connectionFactory.setPassword(this.password);
// connectionFactory.setVirtualHost(this.virtualHost);
// connectionFactory.setPublisherConfirms(true);
// return connectionFactory;
// }
///**
// * 创建 RabbitAdmin
// */
//@Bean
//public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory)
//throws Exception {
//RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//return rabbitAdmin;
//}
//
///**
// * 初始化 RabbitMessagingTemplate
// */
//@Bean
//public RabbitMessagingTemplate rabbitMessagingTemplate(
//ConnectionFactory connectionFactory) {
//RabbitTemplate template = new RabbitTemplate(connectionFactory);
//RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
//rabbitMessagingTemplate
//.setMessageConverter(new GenericMessageConverter());
//rabbitMessagingTemplate.setRabbitTemplate(template);
//return rabbitMessagingTemplate;
//}
}
此配置文件需要重点说一下:
1. 由于是作为第三方包使用,可以有两个选择,
第一:使用jar包自身的配置文件配置mq地址,即jar固定了mq地址,这样的话就需要修改配置文件名,并在配置文件添加propertysource注解,如本例。
第二:主项目使用自己的配置文件,那本工程的配置文件不需要配置,而且这里也不用加注解。
2. 上面注释掉的几个bean,springboot都有默认实现,因此可以不写,如果非得自定义的话可以放开了进行修改。
6. 各队列配置
本例中写了两个队列,因此需要两个configuration配置类:
package com.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.Getter;
@Configuration
@Getter
public class EmailQueueConfig {
@Value("${spring.rabbitmq.email.exchange}")
String emailExchange;
@Value("${spring.rabbitmq.email.queue}")
String emailQueue;
@Value("${spring.rabbitmq.email.routinKey}")
String emailRoutingkey;
@Bean
TopicExchange emailTopicExchange() {
return new TopicExchange(emailExchange, true, false);
}
@Bean
Queue emailQueue() {
/**
durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
return new Queue(emailQueue, true, false, false);
}
@Bean
// 将消息队列emailQueue和交换机进行绑定
Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(emailTopicExchange()).with(emailRoutingkey);
}
}
package com.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.Getter;
@Configuration
@Getter
public class SmsQueueConfig {
@Value("${spring.rabbitmq.sms.exchange}")
String smsExchange;
@Value("${spring.rabbitmq.sms.queue}")
String smsQueue;
@Value("${spring.rabbitmq.sms.routinKey}")
String smsRoutingkey;
@Bean
TopicExchange smsTopicExchange() {
return new TopicExchange(smsExchange, true, false);
}
@Bean
Queue smsQueue() {
return new Queue(smsQueue, true, false, false);
}
@Bean
// 将消息队列emailQueue和交换机进行绑定
Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(smsTopicExchange()).with(smsRoutingkey);
}
}
7. 生产者
两个队列两个生产者:
package com.rabbitmq.producer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.config.EmailQueueConfig;
@Service
public class EmailMQProducer {
private Log log = LogFactory.getLog(this.getClass());
@Autowired
EmailQueueConfig emailQueueConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
log.info("send start.....");
rabbitTemplate.convertAndSend(
emailQueueConfig.getEmailExchange(),
emailQueueConfig.getEmailRoutingkey(),
message);
}
}
package com.rabbitmq.producer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.config.SmsQueueConfig;
@Service
public class SmsMQProducer {
private Log log = LogFactory.getLog(this.getClass());
@Autowired
SmsQueueConfig smsQueueConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
log.info("send start.....");
rabbitTemplate.convertAndSend(
smsQueueConfig.getSmsExchange(),
smsQueueConfig.getSmsRoutingkey(),
message);
}
}
8. 测试
在另一项目中,引入此jar包:
<dependency>
<groupId>com.example</groupId>
<artifactId>apartment-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
因为是同一父项目下的子项目,因此可以直接引,否则需要通过私服。
添加controller测试:
@Controller
@Api(description = "消息队列")
public class RabbitmqController {
private static final Logger logger = LoggerFactory.getLogger(RabbitmqController.class);
@Resource
private EmailMQProducer emailMQProducer;
@Resource
private SmsMQProducer smsMQProducer;
@RequestMapping(value = "/sendemail", method = RequestMethod.GET)
public String sendEmail(){
String message = "email message 1";
emailMQProducer.send(message);
return "home";
}
@RequestMapping(value = "/sendsms", method = RequestMethod.GET)
public String sendSms(){
String message = "sms message 1";
smsMQProducer.send(message);
return "home";
}
}
执行:http://localhost:8080/sendsms
可以看到mq中有消息了
9. 消费者
由于此工程为jar包为第三方提供服务,因此消费者写到主项目自己项目中才合理,在本项目中写了一个demo,使用时只需要将demo复制到主项目中即可使用,或者主项目中自己写一个类也很简单:
package com.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
*
* 此类为一个mq消费者的示例demo,在此jar包中无效,可直接拷贝到主项目中使用。
*
*/
@Component
@RabbitListener(queues = "apartment.email.queue")
public class ConsumerDemo{
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
放到主项目中后,启动主项目,上面三条消息逐一被打印,队列中也消失了。