Spring与ActiveMQ整合(多线程并发发送与接收消息)
本文博客介绍生产者和消费者方都使用多线程技术并发发送和接收消息。
1.生产者
创建固定线程数3的线程池,且发送方开启了四个线程任务。
package com.spring.thread.jms;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class JmsThreadTest {
public static void main(String[] args) {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");
final MessageThreadService ms=(MessageThreadService)(ctx.getBean("messageThreadService"));
ExecutorService threadPool=Executors.newFixedThreadPool(3);
for (int i = 1; i <= 4; i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(new Random().nextInt(3)*500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"处理发送消息"+i);
ms.sendMessage("你好:"+Thread.currentThread().getName()+"的消息"+i);
}
}
});
}
}
}
package com.spring.thread.jms;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component("messageThreadService")
public class MessageThreadService {
private JmsTemplate jmsTemplate;
private Destination mQueue2;
@Resource
public void setmQueue2(Destination mQueue2) {
this.mQueue2 = mQueue2;
}
@Resource
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(final String message){
jmsTemplate.send(mQueue2, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
2.消费者
消费者这边有两种方式去并发接收消息
1)在onMessage方法中,跟生产者那边一样创建线程池去完成接收任务,本例中中有6个线程去处理接收任务
2)可在spring的xml文件中配置并发的线程数
方式一:代码中手动创建线程池
package com.spring.thread.jms;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component("consumerThreadMessageListener")
public class ConsumerThreadMessageListener implements MessageListener {
private ExecutorService threadPool=Executors.newFixedThreadPool(6);
@Override
public void onMessage(final Message message) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("接收线程"+Thread.currentThread().getName()+"接收消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-2.5.xsd" >
<context:annotation-config />
<context:component-scan base-package="com.spring"/>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="connectinFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10"></property>
</bean>
<!-- 配置消息发送目的地方式 -->
<!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 -->
<bean id="mQueue2" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="MessageQueue2" />
</bean>
<!-- Spring JMS Template 配置JMS模版 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
</bean>
<jms:listener-container container-type="default" connection-factory="cachingConnectionFactory" acknowledge="auto">
<jms:listener destination="MessageQueue2" ref="consumerThreadMessageListener"/>
</jms:listener-container>
</beans>
注:实际情况是消息发送的时候,监听器就已经在接收数据了,为了看的清楚,我设置断点在接收方阻塞住了,以让发送方完全发送完数据的时候才去接收消息,所以得出上面的图。
方式二:xml配置线程数
在接收消息方也通过修改xml配置接收消息的并发线程数,这样在onMessage()方法中就不需要创建线程池去处理了。只需要添加concurrentConsumers就可以了。
修改如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-2.5.xsd" >
<context:annotation-config />
<context:component-scan base-package="com.spring"/>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="connectinFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10"></property>
</bean>
<!-- 配置消息发送目的地方式 -->
<!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 -->
<bean id="mQueue2" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="MessageQueue2" />
</bean>
<!-- Spring JMS Template 配置JMS模版 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory" />
</bean>
<bean id="ListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="receiveTimeout" value="1000" />
<property name="destination" ref="mQueue2" />
<property name="messageListener" ref="consumerThreadMessageListener" />
<property name="taskExecutor" ref="MessageExecutor"></property>
<property name="concurrentConsumers" value="7"></property>
</bean>
<bean id="MessageExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="20" />
<property name="maxPoolSize" value="100" />
<property name="daemon" value="true" />
<property name="keepAliveSeconds" value="120" />
</bean>
</beans>
onMessage方法修改如下:
package com.spring.thread.jms;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component("consumerThreadMessageListener")
public class ConsumerThreadMessageListener implements MessageListener {
@Override
public void onMessage(final Message message) {
try {
System.out.println("接收线程"+Thread.currentThread().getName()+"接收消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}