Spring+activeMq集成详解

1.      下载ActiveMQ并安装到服务器上

http://activemq.apache.org/

解压后,根据系统进入对应的目录且执行相应的文件。运行后访问地址

http://localhost:8161/admin/,账户和密码都是admin,效果如下:

Spring+activeMq集成详解

2.       Maven 架包依赖

<dependency>
 
<groupId>org.springframework</groupId>
 
<artifactId>spring-jms</artifactId>
</
dependency>
<
dependency>
 
<groupId>org.apache.activemq</groupId>
 
<artifactId>activemq-all</artifactId>
</
dependency>
<
dependency>
 
<groupId>org.apache.activemq</groupId>
 
<artifactId>activemq-pool</artifactId>
</
dependency>

3.      消息队列链接、会话、模版配置

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <!-- ActiveMQ服务地址 -->
  <property name="brokerURL" value="${mq.brokerURL}"/>
  <property name="userName" value="${mq.userName}"></property>
  <property name="password" value="${mq.password}"></property>
</bean>

<!--
  ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
  可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。
  要依赖于 activemq-pool包
 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
  <property name="connectionFactory" ref="targetConnectionFactory"/>
  <property name="maxConnections" value="${mq.pool.maxConnections}"/>
</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

<!-- 通知队列模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="defaultDestinationName" value="${queueName.notify}"></property>
</bean>
<!--这个是sessionAwareQueue目的地,多个消息队列,配置多个 -->
<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
  <constructor-arg>
    <value>${queueName.notify}</value>
  </constructor-arg>
</bean>

4.      发送消息到消息队列

5.   package com.ec.listener;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @author liuqi
 * @Date 2017-06-13.
 * @describe
 */
@RunWith(SpringJUnit4ClassRunner.class//使用junit4进行测试
@ContextConfiguration
    ({"classpath:spring-context.xml"}) //加载配置文件
public class TestListenerTest {
  @Autowired
  private JmsTemplate jmsTemplate;
  @Autowired
  private Destination sessionAwareQueue;
  @Autowired
  private Destination sessionAwareQueue2;

  @Test
  public void testSendMessage1(){
    jmsTemplate.send(sessionAwareQueue, new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage("hello test Queues !");
      }
    });
  }

  @Test
  public void testSendMessage2(){
    jmsTemplate.send(sessionAwareQueue2, new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage("hello test2 Queues !");
      }
    });
  }
}

 

6.      接收队列里面的消息(配置消息侦听容器进行接收消息)

 

<!-- 可以获取session的MessageListener -->
<bean id="testSessionAwareMessageListener2"
  class="com.ec.listener.Test2SessionAwareMessageListener"></bean>

<bean id="sessionAwareListenerContainer2"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="destination" ref="sessionAwareQueue2"/>
  <property name="messageListener" ref="testSessionAwareMessageListener2"/>
</bean>
Test2SessionAwareMessageListener 代码如下
package com.ec.listener;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.SessionAwareMessageListener;

/**
 * 测试队列2处理
 * @author liuqi
 * @Date 2017-06-13.
 * @describe
 */
public class Test2SessionAwareMessageListener implements SessionAwareMessageListener<Message> {
  private Logger logger = LoggerFactory.getLogger(Test2SessionAwareMessageListener.class);
  @Autowired
  private JmsTemplate jmsTemplate;
  @Autowired
  private Destination sessionAwareQueue2;

  /**
   * 不同的消息可以定义不同的队列监听
   * 接收队列消息内容,并进行相应处理
   * @param message
   * @param session
   */
  public synchronized void onMessage(Message message, Session session) {
    logger.error("测试队列2监听!");
    try {
      ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
      final String ms = msg.getText();
      logger.info("== receive message:" + ms);

      try{
        //// TODO: 2017-06-13  这个地方进行业务处理,并抓住RpcException(业务采用了dubbo服务框架),将消息重新放回队列中,防止调用异常后,消息丢失

      }catch (RpcException  e){
//        发送消息到消息队列
        jmsTemplate.send(sessionAwareQueue2, new MessageCreator() {
          public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(ms);
          }
        });
        logger.error("消费消息出现了调用异常!");
      }


    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

深入了解可访问spring API :http://shouce.jb51.net/spring/