SSM、SpringBoot框架下ActiveMQ互通最佳实践
SSM、SpringBoot框架下ActiveMQ验证
n SSM框架下
网上的关于此类资料五花八门,有的还是错的,而且普遍的一个问题就是配置和使用太乱了,一个简单的东西弄的很复杂,所以结合实际使用情况与自己的一些理解整理了一个简单易用的方案。
我使用的spring版本为4.1.x,其他4.x版本应该差不多。
1. 加入依赖:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
假如不是使用的maven管理依赖,则需要自己配置依赖的jar,这里不多说,贴个依赖关系图看看。
2. 自定义的mq配置,主要是队列名,activemq的服务地址等信息
配置文件mq.properties内容
activemq_url=tcp://127.0.0.1:61616
activemq_username=admin
activemq_password=admin
jms_send_node=service.test
jms_receive_node=service.idea.test
3. 接下来集成activemq到spring容器中,最简单最明白的配置文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq_url}" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!-- 发送的 -->
<!--队列目的地,点对点 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${jms_send_node}</value>
</constructor-arg>
</bean>
<!--主题目的地,一对多 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>
<!-- end发送的 -->
<!-- 接收的 -->
<!--队列目的地,点对点 -->
<bean id="queueDestinationJS" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${jms_receive_node}</value>
</constructor-arg>
</bean>
<!--主题目的地,一对多 -->
<bean id="topicDestinationJS" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>
<!-- end接收的 -->
<bean id="myMessageListener" class="com.im.web.message.MyMessageListener"/>
<!-- 队列消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestinationJS" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<!-- 主题订阅消息监听容器 -->
<bean id="myMessageListener1" class="com.im.web.message.MyMessageListener1"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestinationJS" />
<property name="messageListener" ref="myMessageListener1" />
</bean>
</beans>
这个配置分别配置了发送、接收的功能,没有指定message转换的东西,我觉得那个玩意完全可以自己在代码中写,写个工厂类搞定,只要把不同对象转换为相同数据结构的message对象即可,灵活又高可用。
4. 发送消息的服务接口与实现
public interface MQService {
public void testSendMessage(Object obj);
}
@Service
public class MQServiceImpl implements MQService {
/**
* 注入jms的模板,用于收发消息
*/
@Resource
private JmsTemplate jmsTemplate;
/**
* 注入队列的目的地
*/
@Resource
private Destination queueDestination;
@Override
public void testSendMessage(Object obj) {
jmsTemplate.send(queueDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
if (obj instanceof TbUser) {
TbUser tbUser = (TbUser) obj;
mapMessage.setString("message", tbUser.getUsername() + "-----登陆了");
mapMessage.setString("status", "success");
} else {
mapMessage.setString("message", "生产者发生了异常,传入数据类型错误");
mapMessage.setString("status", "fail");
}
return mapMessage;
}
});
}
}
说明:我做的这个mq的发送服务的一个实现类,主要是系统登陆(自己开发的系统登陆controller)的成功的时候发送一个消息到队列,等待消费者去获取。
5. 消息接收,主要是接收其他系统发出的消息,通过监听对应的队列接收消息,主要实现jmx下的MessageListener 接口onMessage方法。
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("来自idea消息提供者的信息:" + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
6. 然后启动系统。
n SpringBoot框架
springboot是“快速版”的spring,整合了目前很多流行的中间件、模块,精简了很多spring下复杂的集成配置文件,是开箱即用的开发框架,是快速开发的厉器,接下来用它实现activeMQ的消息发动接收功能。
1. 添加依赖(gradle管理)
org.springframework.boot:spring-boot-starter-activemq
org.springframework.boot:spring-boot-configuration-processor读取自定义配置文件需要用到这个包
2. 简单的activeMQ的配置
aplication.yml配置文件内容
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
in-memory: true
pool:
enabled: false
mq.properties自定义配置文件内容
mqsendnode=service.idea.test
读取自定义配置文件的配置类对象,以后获取配置内容直接使用下面的类,取其属性值就ok了。
@Component
@PropertySource("classpath:mq.properties")
@ConfigurationProperties
public class MQProperties {
private String mqsendnode;
private String mqreceivenode;
public void setMqreceivenode(String mqreceivenode) {
this.mqreceivenode = mqreceivenode;
}
public void setMqsendnode(String mqsendnode) {
this.mqsendnode = mqsendnode;
}
public String getMqsendnode() {
return mqsendnode;
}
public String getMqreceivenode() {
return mqreceivenode;
}
}
3. 生产者,也就是消息发送服务。
@Service("producer")
@EnableScheduling
public class JMSProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private MQProperties mqProperties;
public void sendMessage(String message){
Destination destination = new Queue() {
@Override
public String getQueueName() throws JMSException {
return mqProperties.getMqsendnode();
}
};
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message+ new SimpleDateFormat("yyyy-HH-mm hh:MM:ss").format(new Date()));
}
});
//jmsTemplate.convertAndSend(destination, message);
}
@Scheduled(initialDelay = 1000,fixedRate = 3000)
private void testRun(){
sendMessage("我是idea的消息生产者");
}
}
4. 消费者,也就是消息接收者(这里接收SSM那个系统发出的消息)。
@Component
public class Consumer {
@JmsListener(destination = "service.test")
public void receiveQueue(Object obj) throws Exception{
// 这里其实可以用工厂
if(obj instanceof String){
System.out.println("Consumer收到的消息为:"+obj);
} else if(obj instanceof MapMessage){
MapMessage message = (MapMessage) obj;
System.out.println("Consumer收到的消息为:" + message.getString("message"));
}
}
}
5. Application启动类中开启支持JMS
@SpringBootApplication
@EnableJms
public class ActivemqApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqApplication.class, args);
}
}
6. 启动run。
n 两系统消息互通(互发互收消息)
SSM是登陆后,会发消息到队列,SpringBoot中会接收;SpringBoot是定时发送消息,此消息带时间戳,每3秒钟发送一条消息到队列,SSM会实时接收。
启动两个系统,看效果。
1. 两个队列在activeMQ中的消息信息。
2. SSM中接收的消息:
3. SpringBoot中接收的消息:(当SSM系统点击登陆后)