activemq与spring整合

消息队列activemq实现了jms标准,而且还可以与spring进行无缝整合,到现在activemq仍然活跃在企业的项目中。在说明activemqspring整合前,首先来了解一下基本概念。源代码链接

1、JMS

JMS即Java消息服务Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的APIActivemq消息队列组件实现了JMS操作规范。

2、JMS消息通信方式

通常有两种方式:点对点(Point-to-Point)和发布与订阅(Publish/Subscribe)。点对点的方式类似与日常生活正的两个手机用户之间的互发短信,只能是一个发送另一个接收;发布与订阅类似于我们关注的公众号,一个公众号可以被多个用户关注和订阅,每当公众号有更新后,我们打开app都会接收到相关推送的消息。

3、JMS操作流程

    按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:

1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

2. 利用factory构造JMS connection

3. 启动connection

4. 通过connection创建JMS session.

5. 指定JMS destination.

6. 创建JMS producer或者创建JMS message并提供destination.

7. 创建JMS consumer或注册JMS message listener.

8. 发送和接收JMS message.

9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

4、下载安装activeMQ

Activemq下载地址:http://activemq.apache.org/download-archives.html

可下载的版本分为window版和Linux版,本次下载的为linux版本的activemq:apache-activemq-5.9.1-bin.tar.gz

1)、将active mq组件上传到Linux下,而后解压缩:

tar xzvf /srv/ftp/apache-activemq-5.12.0-bin.tar.gz -C /usr/local

2)、配置一个新的用户:activemq

· 修改文件:vim /usr/local/apache-activemq-5.12.0/conf/jetty-realm.properties;

activemq : java,user

activemq与spring整合

(3)、启动ActiveMQ服务:

/usr/local/apache-activemq-5.12.0/bin/activemq start

(4)、打开页面浏览器:输入http://IP:8161,看到以下界面说明activemq安装成功

· 例:http://192.168.28.72:8161

activemq与spring整合

5)、输入http://IP:8161/admin进行用户的登录,而后创建一个Queue,命名为FirstQueue

5、Spring与activemq的整合

建立一个mq的web项目目录结构如下:

activemq与spring整合

5.1在pom.xml文件中引入springmq的依赖

5.2配置生产者和消费者的xml文件

(1)配置spring-productorMQ.xml

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  5.     xmlns:jms="http://www.springframework.org/schema/jms"  
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd   
  7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd  
  8.         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">  
  9.   
  10.     <!-- 定义JmsTemplate的Queue类型 -->  
  11.     <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">  
  12.         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->    
  13.         <constructor-arg ref="connectionFactory" />  
  14.         <!-- 非pub/sub模型(发布/订阅),即队列模式 -->  
  15.         <property name="pubSubDomain" value="false" />  
  16.     </bean>  
  17.   
  18.     <!-- Spring Caching连接工厂, Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    
  19.     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
  20.         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->    
  21.         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>  
  22.         <!-- 同上,同理 -->  
  23.         <!-- <constructor-arg ref="amqConnectionFactory" /> -->  
  24.         <!-- Session缓存数量 -->  
  25.         <property name="sessionCacheSize" value="100" />  
  26.     </bean>  
  27.   
  28.     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
  29.     <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  30.         <property name="brokerURL" value="tcp://192.168.154.158:61616"/>  
  31.         <property name="transportListener"  ref="providerTransportListener"/>   
  32.     </bean>  
  33.       
  34.     <!--Queue目的地-->  
  35.     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  36.         <constructor-arg name="name" value="FirstQueue"></constructor-arg>  
  37.     </bean>   
  38.       
  39.     <!-- 配置MessageListenerContainer -->  
  40.     <bean  
  41.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  42.         <property name="connectionFactory" ref="connectionFactory" />  
  43.         <property name="destination" ref="queueDestination" />  
  44.         <property name="messageListener" ref="myQueueListener" />  
  45.     </bean>  
  46.   
  47.     <!-- 定义JmsTemplate的Topic类型 -->  
  48.     <!-- <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  
  49.          这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象  
  50.         <constructor-arg ref="connectionFactory" />  
  51.         pub/sub模型(发布/订阅)  
  52.         <property name="pubSubDomain" value="true" />  
  53.     </bean> -->  
  54.       
  55.     <!--Topic目的地-->  
  56.     <!-- <bean id="itemTopic" class="org.apache.activemq.command.ActiveMQTopic">  
  57.         <constructor-arg name="name" value="FirstTopic"></constructor-arg>  
  58.     </bean> -->  
  59.       
  60.     <!-- 配置MessageListenerContainer -->  
  61.     <!-- <bean  
  62.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  63.         <property name="connectionFactory" ref="connectionFactory" />  
  64.         <property name="destination" ref="itemTopic" />  
  65.         <property name="messageListener" ref="myTopicListener" />  
  66.     </bean> -->  
  67.       
  68.     <!-- 自定义消息监听器,用于接收queue或topic之后做业务处理 -->  
  69.     <!-- <bean id="myTopicListener" class="cn.fengfeng.activemq.consumer.TopicReceiver"></bean> -->  
  70.       
  71.       
  72. </beans>   

(2)配置spring-consumerMQ.xml

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  5.     xmlns:jms="http://www.springframework.org/schema/jms"  
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd   
  7.         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd  
  8.         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">  
  9.   
  10.     <!-- Spring Caching连接工厂, Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    
  11.     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
  12.         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->    
  13.         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>  
  14.         <!-- 同上,同理 -->  
  15.         <!-- <constructor-arg ref="amqConnectionFactory" /> -->  
  16.         <!-- Session缓存数量 -->  
  17.         <property name="sessionCacheSize" value="100" />  
  18.     </bean>  
  19.   
  20.     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
  21.     <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  22.         <property name="brokerURL" value="tcp://192.168.154.158:61616"/>  
  23.         <property name="transportListener"  ref="providerTransportListener"/>   
  24.     </bean>  
  25.       
  26.     <!--Queue目的地-->  
  27.     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  28.         <constructor-arg name="name" value="FirstQueue"></constructor-arg>  
  29.     </bean>   
  30.       
  31.     <!-- mq监听类,对mq整体的变化的监控  -->  
  32.     <bean id="providerTransportListener" class="cn.fengfeng.activemq.listener.ProviderTransportListener"/>  
  33.       
  34.     <!-- 自定义消息监听器,用于接收queue或topic之后做业务处理 -->  
  35.     <bean id="myQueueListener" class="cn.fengfeng.activemq.consumer.Receiver"></bean>  
  36.   
  37.     <!-- 配置MessageListenerContainer监听容器 -->  
  38.     <bean  
  39.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  40.         <property name="connectionFactory" ref="connectionFactory" />  
  41.         <property name="destination" ref="queueDestination" />  
  42.         <property name="messageListener" ref="myQueueListener" />  
  43.     </bean>  
  44.   
  45.     <!--Topic目的地-->  
  46.     <!-- <bean id="itemTopic" class="org.apache.activemq.command.ActiveMQTopic">  
  47.         <constructor-arg name="name" value="FirstTopic"></constructor-arg>  
  48.     </bean> -->  
  49.       
  50.     <!-- 配置MessageListenerContainer -->  
  51.     <!-- <bean  
  52.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  53.         <property name="connectionFactory" ref="connectionFactory" />  
  54.         <property name="destination" ref="itemTopic" />  
  55.         <property name="messageListener" ref="myTopicListener" />  
  56.     </bean> -->  
  57.       
  58.     <!-- 自定义消息监听器,用于接收queue或topic之后做业务处理 -->  
  59.     <!-- <bean id="myTopicListener" class="cn.fengfeng.activemq.consumer.TopicReceiver"></bean> -->  
  60.       
  61.       
  62. </beans>   

(3)配置web.xml文件

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" id="WebApp_ID" version="3.1">  
  3.   <display-name>mq</display-name>  
  4.   <!-- 在WEB容器里面进行Spring容器的加载 -->  
  5.     <listener>  
  6.         <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>  
  7.     </listener>  
  8.     <!-- 此配置描述的是在项目开发过程之中,Spring容器所需要使用到的配置文件 -->  
  9.     <context-param>  
  10.         <param-name>contextConfigLocation</param-name>  
  11.         <param-value>  
  12.             classpath*:spring-*.xml  
  13.         </param-value>  
  14.     </context-param>  
  15.     <!-- 此为Spring MVC配置所需要的程序文件,所有的请求都提交给Spring的Servlet程序 -->  
  16.     <servlet>  
  17.         <servlet-name>springmvc</servlet-name>  
  18.         <!-- 此为Spring MVC自己提供的servlet程序,一定要写上,因为其可以处理用户请求 -->  
  19.         <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>  
  20.         <init-param>  
  21.             <param-name>contextConfigLocation</param-name>  
  22.             <!-- 配置Spring MVC所需要的配置文件 -->  
  23.             <param-value>classpath:spring-mvc.xml</param-value>  
  24.         </init-param>  
  25.     </servlet>  
  26.     <!-- SpringMVC中所有路径的请求映射,使用的是“*.action” -->  
  27.     <servlet-mapping>  
  28.         <servlet-name>springmvc</servlet-name>  
  29.         <url-pattern>*.action</url-pattern>  
  30.     </servlet-mapping>  
  31.     <!-- Spring中提供的编码过滤器,使用的编码都是UTF-8 -->  
  32.     <filter>  
  33.         <filter-name>encoding</filter-name>  
  34.         <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>  
  35.         <init-param>  
  36.             <param-name>encoding</param-name>  
  37.             <param-value>UTF-8</param-value>  
  38.         </init-param>  
  39.     </filter>  
  40.     <filter-mapping>  
  41.         <filter-name>encoding</filter-name>  
  42.         <url-pattern>/*</url-pattern>  
  43.     </filter-mapping>  
  44.   <welcome-file-list>  
  45.     <welcome-file>index.html</welcome-file>  
  46.     <welcome-file>index.htm</welcome-file>  
  47.     <welcome-file>index.jsp</welcome-file>  
  48.     <welcome-file>default.html</welcome-file>  
  49.     <welcome-file>default.htm</welcome-file>  
  50.     <welcome-file>default.jsp</welcome-file>  
  51.   </welcome-file-list>  
  52. </web-app>  

5.4、生产者代码Sender.java

[java] view plain copy
  1. package cn.fengfeng.activemq.productor;  
  2. import javax.annotation.Resource;  
  3. import javax.jms.Destination;  
  4. import javax.jms.JMSException;  
  5. import javax.jms.Message;  
  6. import javax.jms.Session;  
  7.   
  8. import org.springframework.jms.core.JmsTemplate;  
  9. import org.springframework.jms.core.MessageCreator;  
  10. import org.springframework.stereotype.Service;  
  11.   
  12. @Service  
  13. public class Sender {  
  14.     private static final int SEND_NUMBER = 5;  
  15.       
  16.     @Resource  
  17.     private JmsTemplate jmsTemplate ;  
  18.       
  19.     //name属性用来明确使用哪个queue或者topic作为发送消息的Destination  
  20.     @Resource(name = "queueDestination")   
  21.     private Destination queueDestination ;  
  22.       
  23.     public void sendMessage(String message) throws InterruptedException {  
  24.         for (int x = 0 ; x < SEND_NUMBER ; x ++ ) {  
  25.             this.jmsTemplate.send(this.queueDestination, new MessageCreator() {  
  26.                 @Override  
  27.                 public Message createMessage(Session session) throws JMSException {  
  28.                     System.out.println("***" + message + "***");  
  29.                     return session.createTextMessage(message);  
  30.                 }  
  31.             });  
  32.             Thread.sleep(1000) ;  
  33.         }  
  34.           
  35.     }  
  36. }  
5.5、消费者代码Receiver.java
[java] view plain copy
  1. package cn.fengfeng.activemq.consumer;  
  2. import javax.jms.JMSException;  
  3. import javax.jms.Message;  
  4. import javax.jms.MessageListener;  
  5. import javax.jms.TextMessage;  
  6.   
  7. public class Receiver implements MessageListener {  
  8.     @Override  
  9.     public void onMessage(Message message) {  
  10.         if (message instanceof TextMessage ) {  
  11.             TextMessage textMessage = (TextMessage) message;  
  12.             try {  
  13.                 // 生产者发送消息后,就会被此类监听到,而后获取消息,进而做一些业务处理  
  14.                 System.out.println("###" + textMessage.getText() + "###");  
  15.             } catch (JMSException e) {  
  16.                 // TODO Auto-generated catch block  
  17.                 e.printStackTrace();  
  18.             }  
  19.         }     
  20.     }  
  21. }  
5.6、监听类

[java] view plain copy
  1. package cn.fengfeng.activemq.listener;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.activemq.transport.TransportListener;  
  6. import org.slf4j.Logger;  
  7. import org.slf4j.LoggerFactory;  
  8.   
  9. public class ProviderTransportListener implements TransportListener {  
  10.       
  11.     private static final Logger Log = LoggerFactory.getLogger(ProviderTransportListener.class) ;  
  12.     @Override  
  13.     public void onCommand(Object arg0) {  
  14.         // TODO Auto-generated method stub  
  15.         Log.debug("监听到服务命令:{}" + arg0);  
  16.   
  17.     }  
  18.   
  19.     @Override  
  20.     public void onException(IOException arg0) {  
  21.         // TODO Auto-generated method stub  
  22.         Log.debug("出现异常:{}" + arg0);  
  23.     }  
  24.   
  25.     @Override  
  26.     public void transportInterupted() {  
  27.         // TODO Auto-generated method stub  
  28.         Log.debug("传输中断。");  
  29.     }  
  30.   
  31.     @Override  
  32.     public void transportResumed() {  
  33.         // TODO Auto-generated method stub  
  34.         Log.debug("建立连接或传输恢复!");  
  35.     }  
  36.   
  37. }  
5.7、建立一个action

[java] view plain copy
  1. package cn.fengfeng.activemq.controller;  
  2.   
  3. import javax.annotation.Resource;  
  4.   
  5. import org.springframework.stereotype.Controller;  
  6. import org.springframework.web.bind.annotation.RequestMapping;  
  7. import org.springframework.web.servlet.ModelAndView;  
  8.   
  9. import cn.fengfeng.activemq.productor.Sender;  
  10.   
  11. @Controller  
  12. @RequestMapping("/activemq/*")  
  13. public class ActiveMQAction {  
  14.       
  15.     @Resource  
  16.     private Sender sender ;  
  17.       
  18.     @RequestMapping("/send")  
  19.     public ModelAndView send(String message) {  
  20.         try {  
  21.             this.sender.sendMessage(message);  
  22.         } catch (InterruptedException e) {  
  23.             e.printStackTrace();  
  24.         }  
  25.         return null ;  
  26.     }  
  27. }  

将项目部署到tomcat下,先启动activemq的服务,而后启动tomcat,在浏览器中通过访问http://localhost/mq/activemq/send.action?message=helloworld 将会在后台中看到发送的消息和被监听到的消息如下图所示:

activemq与spring整合