ActiveMQ 基本使用
原文:https://blog.csdn.net/liuyuanq123/article/details/79109218
ActiveMQ介绍
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)
pom中添加依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
queue的发送代码:
public void testMQProducerQueue() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(queue);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
接受的代码:
public void TestMQConsumerQueue() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程序等待接收用户消息
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
topic发送代码:
public void TestTopicProducer() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Topic topic = session.createTopic("test-topic");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(topic);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-topic");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
topic接受代码:
public void TestTopicConsumer() throws Exception{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Topic topic = session.createTopic("test-topic");
//6、使用会话对象创建生产者对象
MessageConsumer consumer = session.createConsumer(topic);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程序等待接收用户消息
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
与spring整合,applicationContext-activemq.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
<bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
</bean>
<!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
</bean>
<!-- 配置destination -->
<!-- 队列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spring-queue"/>
</bean>
<!-- 话题目的地 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic"/>
</bean>
<!-- 配置监听器 -->
<bean id="myListener" class="com.taotao.search.listener.MyListener"/>
<bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
<!-- 系统监听器 -->
<!-- <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="myListener"/>
</bean> -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="itemAddTopic"/>
<property name="messageListener" ref="itemAddListener"/>
</bean>
</beans>