Spring+activeMq集成详解
1. 下载ActiveMQ并安装到服务器上
解压后,根据系统进入对应的目录且执行相应的文件。运行后访问地址
http://localhost:8161/admin/,账户和密码都是admin,效果如下:
2. Maven 架包依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
3. 消息队列链接、会话、模版配置
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}"/> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory"/> <property name="maxConnections" value="${mq.pool.maxConnections}"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 通知队列模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName.notify}"></property> </bean> <!--这个是sessionAwareQueue目的地,多个消息队列,配置多个 --> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>${queueName.notify}</value> </constructor-arg> </bean>
4. 发送消息到消息队列
5. package com.ec.listener; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * @author liuqi * @Date 2017-06-13. * @describe */ @RunWith(SpringJUnit4ClassRunner.class) //使用junit4进行测试 @ContextConfiguration ({"classpath:spring-context.xml"}) //加载配置文件 public class TestListenerTest { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination sessionAwareQueue; @Autowired private Destination sessionAwareQueue2; @Test public void testSendMessage1(){ jmsTemplate.send(sessionAwareQueue, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage("hello test Queues !"); } }); } @Test public void testSendMessage2(){ jmsTemplate.send(sessionAwareQueue2, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage("hello test2 Queues !"); } }); } }
6. 接收队列里面的消息(配置消息侦听容器进行接收消息)
<!-- 可以获取session的MessageListener --> <bean id="testSessionAwareMessageListener2" class="com.ec.listener.Test2SessionAwareMessageListener"></bean> <bean id="sessionAwareListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="sessionAwareQueue2"/> <property name="messageListener" ref="testSessionAwareMessageListener2"/> </bean>
Test2SessionAwareMessageListener 代码如下
package com.ec.listener; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.apache.activemq.command.ActiveMQTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.listener.SessionAwareMessageListener; /** * 测试队列2处理 * @author liuqi * @Date 2017-06-13. * @describe */ public class Test2SessionAwareMessageListener implements SessionAwareMessageListener<Message> { private Logger logger = LoggerFactory.getLogger(Test2SessionAwareMessageListener.class); @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination sessionAwareQueue2; /** * 不同的消息可以定义不同的队列监听 * 接收队列消息内容,并进行相应处理 * @param message * @param session */ public synchronized void onMessage(Message message, Session session) { logger.error("测试队列2监听!"); try { ActiveMQTextMessage msg = (ActiveMQTextMessage) message; final String ms = msg.getText(); logger.info("== receive message:" + ms); try{ //// TODO: 2017-06-13 这个地方进行业务处理,并抓住RpcException(业务采用了dubbo服务框架),将消息重新放回队列中,防止调用异常后,消息丢失 }catch (RpcException e){ // 发送消息到消息队列 jmsTemplate.send(sessionAwareQueue2, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(ms); } }); logger.error("消费消息出现了调用异常!"); } } catch (Exception e) { e.printStackTrace(); } } } 深入了解可访问spring API :http://shouce.jb51.net/spring/