spring jms
简介
ActiveMQ 是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。ActiveMQ5.0相比以前版本提供了一些非常有用的新功能:
- AMQ Message Store (Faster Persistence!)
- Cursors (To handle very large number of stored messages)
- Blob Messages
- Command Agent
- Enterprise Integration Patterns via Camel Integration
- Logging a warning if you forget to start a Connection
- Message Transformation
- Mirrored Queues
- Flow Control
鉴于目前关于ActiveMQ5.0的文章比较少,故准备写一系列ActiveMQ的使用方面的文章。本篇先从安装开始。
安装
- 在http://activemq.apache.org/download.html 下载5.0.0发行包,解压到需要安装ActiveMQ的文件夹,记为/path/to/activemq。
- unix环境activemq文件夹需要执行权限,执行如下命令 chmod -R 755 /path/to/activemq
启动
- window环境运行/path/to/activemq/bin/activemq.bat
- unix环境运行/path/to/activemq/bin/activemq
测试
ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动
- window环境运行 netstat -an|find "61616"
- unix环境运行netstat -an|grep 61616
监控
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。
admin:http://127.0.0.1:8161/admin/
demo:http://127.0.0.1:8161/demo/
点击demo应用中的“ Market data publisher ”,就会发一些测试的消息。转到admin页面的topics menu下面(queue和topic的区别见 http://andyao.iteye.com/blog/153173 ),可以看到消息在增长。
配置
ActiveMQ5.0的配置文件在/path/to/activemq/conf目录下面。主要配置文件为activemq.xml
简介
上一篇http://www.iteye.com/topic/15317介绍了ActiveMQ5.0的安装,这一篇将介绍的配置。ActiveMQ包含了很多features(详见http://activemq.apache.org/features.html ),
不同的需求,不同的环境,需要不同的features,当然需要不同的配置。在这里我只写了最基本的配置,算是抛砖了,希望引出更多关于ActiveMQ的高级配置。
假设已经正确安装ActiveMQ5.0,同时及其IP地址为192.168.1.148,具体使用时可以改为自己的IP。下面讲解的配置实现的features如下:
- 客户端可以通过tcp://192.168.1.148连接ActiveMQ。
- 消息持久化保存,重启服务器不会丢失消息。
- 可以通过http://192.168.1.148:8161/admin监控ActiveMQ服务器
配置
ActiveMQ默认使用的是XML格式配置,从4.0版本开始用MBean的方式实现XML配置,配置文件在${activemq.home}/conf目录下,文件名为activemq.xml。最新的默认配置见
http://svn.apache.org/repos/asf/activemq/trunk/assembly/src/release/conf/activemq.xml 。下面为本篇文章使用的配置,及重要部分的解释。
- <beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.org/config/1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
- http://activemq.apache.org/camel/schema/spring>
- <!-- persistent="true"表示要持久化存储消息,和子元素persistenceAdapter结合使用 -->
- <!-- dataDirectory默认的存储持久化数据的目录 -->
- <!-- brokerName 设置broker的name,在注意在网络上必须是唯一的-->
- <!-- 更多参考http://activemq.apache.org/xbean-xml-reference-50.html#XBeanXMLReference5.0-brokerelement -->
- <broker xmlns="http://activemq.org/config/1.0" brokerName="192.168.1.148" persistent ="true" dataDirectory="${activemq.base}/data" useShutdownHook="false">
- <!-- Destination specific policies using destination names or wildcards -->
- <!-- wildcards意义见http://activemq.apache.org/wildcards.html -->
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <!-- 这里使用了wildcards,表示所有以EUCITA开头的topic -->
- <policyEntry topic="EUCITA.>" producerFlowControl="false" memoryLimit="10mb">
- <!-- 分发策略 -->
- <dispatchPolicy>
- <!-- 按顺序分发 -->
- <strictOrderDispatchPolicy/>
- </dispatchPolicy>
- <!-- 恢复策略-->
- <subscriptionRecoveryPolicy>
- <!-- 只恢复最后一个message -->
- <lastImageSubscriptionRecoveryPolicy/>
- </subscriptionRecoveryPolicy>
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
- <!-- The transport connectors ActiveMQ will listen to -->
- <transportConnectors>
- <transportConnector name="openwire" uri="tcp://192.168.1.148:61616" discoveryUri="multicast://default"/>
- <transportConnector name="ssl" uri="ssl://192.168.1.148:61617"/>
- <transportConnector name="stomp" uri="stomp://192.168.1.148:61613"/>
- <transportConnector name="xmpp" uri="xmpp://192.168.1.148:61222"/>
- </transportConnectors>
- <!-- 消息持久化方式 -->
- <persistenceAdapter>
- <amqPersistenceAdapter directory="${activemq.base}/data"/>
- </persistenceAdapter>
- </broker>
- <!-- lets create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
- <commandAgent xmlns="http://activemq.org/config/1.0"/>
- <!-- An embedded servlet engine for serving up the Admin console -->
- <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
- <connectors>
- <nioConnector port="8161" />
- </connectors>
- <handlers>
- <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
- <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true" />
- </handlers>
- </jetty>
- </beans>
- <beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.org/config/1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
- http://activemq.apache.org/camel/schema/spring>
- <!-- persistent="true"表示要持久化存储消息,和子元素persistenceAdapter结合使用 -->
- <!-- dataDirectory默认的存储持久化数据的目录 -->
- <!-- brokerName 设置broker的name,在注意在网络上必须是唯一的-->
- <!-- 更多参考http://activemq.apache.org/xbean-xml-reference-50.html#XBeanXMLReference5.0-brokerelement -->
- <broker xmlns="http://activemq.org/config/1.0" brokerName="192.168.1.148" persistent ="true" dataDirectory="${activemq.base}/data" useShutdownHook="false">
- <!-- Destination specific policies using destination names or wildcards -->
- <!-- wildcards意义见http://activemq.apache.org/wildcards.html -->
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <!-- 这里使用了wildcards,表示所有以EUCITA开头的topic -->
- <policyEntry topic="EUCITA.>" producerFlowControl="false" memoryLimit="10mb">
- <!-- 分发策略 -->
- <dispatchPolicy>
- <!-- 按顺序分发 -->
- <strictOrderDispatchPolicy/>
- </dispatchPolicy>
- <!-- 恢复策略-->
- <subscriptionRecoveryPolicy>
- <!-- 只恢复最后一个message -->
- <lastImageSubscriptionRecoveryPolicy/>
- </subscriptionRecoveryPolicy>
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
- <!-- The transport connectors ActiveMQ will listen to -->
- <transportConnectors>
- <transportConnector name="openwire" uri="tcp://192.168.1.148:61616" discoveryUri="multicast://default"/>
- <transportConnector name="ssl" uri="ssl://192.168.1.148:61617"/>
- <transportConnector name="stomp" uri="stomp://192.168.1.148:61613"/>
- <transportConnector name="xmpp" uri="xmpp://192.168.1.148:61222"/>
- </transportConnectors>
- <!-- 消息持久化方式 -->
- <persistenceAdapter>
- <amqPersistenceAdapter directory="${activemq.base}/data"/>
- </persistenceAdapter>
- </broker>
- <!-- lets create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
- <commandAgent xmlns="http://activemq.org/config/1.0"/>
- <!-- An embedded servlet engine for serving up the Admin console -->
- <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
- <connectors>
- <nioConnector port="8161" />
- </connectors>
- <handlers>
- <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
- <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true" />
- </handlers>
- </jetty>
- </beans>
注释
关于XML配置中元素的具体信息可以参考http://activemq.apache.org/xbean-xml-reference-50.html 下面介绍本篇配置使用的一些重要元素。
DispathPolicy
ActiveMQ支持3中不同的分发策略(避免翻译了以后误解,这里用原文):
- <roundRobinDispatchPolicy>:Simple dispatch policy that sends a message to every subscription that matches the message.
- <simpleDispatchPolicy>:Simple dispatch policy that sends a message to every subscription that matches the message.
- <strictOrderDispatchPolicy>:Dispatch policy that causes every subscription to see messages in the same order.
SubscriptionRecoveryPolicy
ActiveMQ支持6种恢复策略,可以自行选择使用不同的策略
- <fixedCountSubscriptionRecoveryPolicy>: keep a fixed count of last messages.
- <fixedSizedSubscriptionRecoveryPolicy>: keep a fixed amount of memory available in RAM for message history which is evicted in time order.
- <lastImageSubscriptionRecoveryPolicy>:only keep the last message.
- <noSubscriptionRecoveryPolicy>:disable recovery of messages.
- <queryBasedSubscriptionRecoveryPolicy>:perform a user specific query mechanism to load any messages they may have missed.
- <timedSubscriptionRecoveryPolicy>:keep a timed buffer of messages around in memory and use that to recover new subscriptions.
PersistenceAdapter
http://activemq.apache.org/persistence 讲解了关于persistence的信息。ActiveMQ5.0使用AMQ Message Store 持久化消息,这种方式提供了很好的性能(The AMQ Message Store is an embeddable transactional message storage solution that is extremely fast and reliable.) 默认使用该存储方式即可,如果想使用JDBC来存储,可以查找文档配置。
Summary
本篇文章只提供了基本配置信息。如果需要更多的文章,可以查看ActiveMQ的文档。
讲了安装和简单的配置.
本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之.
TOPIC和QUEUE分别代表一个topic和一个queue消息通道.
- TopicMessageProducer向topic发送消息, TopicConsumerA和TopicConsumerB则从topic消费消息.
- QueueMessageProducer向Queue发送消息, QueueConsumer从Queue中消费消息
Spring整合JMS
就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.
- ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
- Destination. 有topic和queue两种方式.
- JmsTemplate. spring提供的jms模板.
- MessageConverter. 消息转换器.
- MessageProducer. 消息生产者.
- MessageConsumer. 消息消费者.
- MessageListener. 消息监听器
- MessageListenerContainer. 消息监听容器
下面以实例的方式介绍上面8个部分.
1. ConnectionFactory
- <amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />
brokerURL是指要连接的activeMQ server的地址, activeMQ提供了多种brokerURL, 集体可参见文档.一般我们使用嵌套的ActiveMQ server. 配置如下, 这个配置使用消息的存储机制, 服务器重启也不会丢失消息.
- <!-- embedded ActiveMQ Broker -->
- <amq:broker useJmx="false" persistent="true">
- <amq:persistenceAdapter>
- <amq:amqPersistenceAdapter directory="d:/amq"/>
- </amq:persistenceAdapter>
- <amq:transportConnectors>
- <amq:transportConnector uri="tcp://localhost:61616" />
- <amq:transportConnector uri="vm://localhost:0" />
- </amq:transportConnectors>
- </amq:broker>
2. Destination
在实例中我们使用了两种destination
- <!-- ActiveMQ destinations -->
- <!-- 使用topic方式-->
- <amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />
- <!-- 使用Queue方式-->
- <amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />
3. JmsTemplate
- <!-- Spring JmsTemplate config -->
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory">
- <!-- lets wrap in a pool to avoid creating a connection per send -->
- <bean class="org.springframework.jms.connection.SingleConnectionFactory">
- <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
- </bean>
- </property>
- <!-- custom MessageConverter -->
- <property name="messageConverter" ref="defaultMessageConverter" />
- </bean>
4. MessageConverter
MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能. DefaultMessageConverter的实现见附件.
- <bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />
5. MessageProducer
实例拥有两个消息生产者, 消息生产者都是POJO, 实现见附件.
- <!-- POJO which send Message uses Spring JmsTemplate -->
- <bean id="topicMessageProducer" class="com.andyao.activemq.TopicMessageProducer">
- <property name="template" ref="jmsTemplate" />
- <property name="destination" ref="TOPIC" />
- </bean>
- <bean id="queueMessageProducer" class="com.andyao.activemq.QueuMessageProducer">
- <property name="template" ref="jmsTemplate" />
- <property name="destination" ref="QUEUE" />
- </bean>
6. MessageConsumer
TOPIC通道有两个消息消费者, QUEUE有一个消息消费者
- <!-- Message Driven POJO (MDP) -->
- <!-- consumer1 for topic a -->
- <bean id="topicConsumerA" class="com.andyao.activemq.TopicConsumerA" />
- <!-- consumer2 for topic a -->
- <bean id="topicConsumerB" class="com.andyao.activemq.TopicConsumerB" />
- <!-- consumer for queue -->
- <bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />
7. MessageListener
每一个消息消费者都对应一个MessageListener
- <bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
- <constructor-arg ref="topicConsumerA" />
- <!-- may be other method -->
- <property name="defaultListenerMethod" value="receive" />
- <!-- custom MessageConverter define -->
- <property name="messageConverter" ref="defaultMessageConverter" />
- </bean>
- <bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
- <constructor-arg ref="topicConsumerB" />
- <!-- may be other method -->
- <property name="defaultListenerMethod" value="receive" />
- <!-- custom MessageConverter define -->
- <property name="messageConverter" ref="defaultMessageConverter" />
- </bean>
- <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
- <constructor-arg ref="queueConsumer" />
- <!-- may be other method -->
- <property name="defaultListenerMethod" value="receive" />
- <!-- custom MessageConverter define -->
- <property name="messageConverter" ref="defaultMessageConverter" />
- </bean>
8. MessageListenerContainer
有几个MessageListener既有几个MessageListenerContainer
- <bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="jmsConnectionFactory" />
- <property name="destination" ref="TOPIC" />
- <property name="messageListener" ref="topicListenerA" />
- </bean>
- <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="jmsConnectionFactory" />
- <property name="destination" ref="TOPIC" />
- <property name="messageListener" ref="topicListenerB" />
- </bean>
- <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="jmsConnectionFactory" />
- <property name="destination" ref="QUEUE" />
- <property name="messageListener" ref="queueListener" />
- </bean>
Summary
写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚:
- 可以有一个或者多个消息生产者向同一个destination发送消息.
- queue类型的只能有一个消息消费者.
- topic类型的可以有多个消息消费者.
- 每个消费者对应一个MessageListener和一个MessageListenerContainer.