Active MQ使用
Kagula
2011-9-6
介绍
Active MQ是个消息队列管理器,用于通讯的中间件。
Java + Active MQ的常见使用方式有两种:[1]点对点方式(Producer/Consumer)[2]发布/订阅者方式(Publisher/Subscriber Model)
测试环境[1]JDK1.6.x [2]Eclipse Indigo [3]Active MQ 5.4.2
建议不要使用Active MQ 5.5.0 因为activemq-all-5.5.0.jar缺少依赖项。
正文
参考资料[1] 保证Active MQ已经正确安装与启动。
点对点(Producer、Consumer)模式
图一 JMS对象模型
Producer端源码示例
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 消息的生产者(发送者)
*
*/
public class JmsSender {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
// createSession的第一个参数Boolean.FALSE指的是当前session不是一个事务
// 即消息是自动提交,你不能调用session.commit函数提交事务。
// createSession的第一个参数Boolean.TRUE指的是新创建的session是一个事务
// 你必须调用session.commit函数来提交事务,才能真正把消息放到队列服务器中
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue("my-queue");
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送一条消息
sendMsg(session, producer);
connection.close();
}
/**
* 在指定的会话上,通过指定的消息生产者发出一条消息
*
* @param session 消息会话
* @param producer 消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer) throws JMSException {
//创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
//通过消息生产者发出消息
producer.send(message);
System.out.println("");
}
}
Consumer端源码示例
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsReceiver {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://127.0.0.1:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
// createSession的第一个参数Boolean.FALSE指的是当前session不是一个事务
// 即消息是自动提交或接收,你不能调用session.commit函数提交事务。
// createSession的第一个参数Boolean.TRUE指的是新创建的session是一个事务
// 你必须调用session.commit函数来提交事务,否则接收过的消息还在
// 队列服务器中,你再接收一次,发现他们还待着。
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息从哪个队列接收.
Destination destination = session.createQueue("my-queue");
// 消费者,消息接收者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
//1秒内没有收到消息,即返回
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message)
System.out.println("收到消息:" + message.getText());
else
break;
}
session.commit();
session.close();
connection.close();
}
}
Publisher/Subscriber Model
消息传播以一对多的关系传递,即一个Publisher,多个Subscriber。
源码示例
public class SampleUtilities {
static public class DoneLatch {
boolean done = false;
/**
* Waits until done is set to true.
*/
public void waitTillDone() {
synchronized (this) {
while (! done) {
try {
this.wait();
} catch (InterruptedException ie) {}
}
}
}
/**
* Sets done to true.
*/
public void allDone() {
synchronized (this) {
done = true;
this.notify();
}
}
}
}
import java.util.Date;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author Kim Haase
* @comment&modified by Kagula
* @version 1.6, 08/18/00
* @lastUpdateDate 09/06/11
*/
public class AsynchTopicExample {
final String CONTROL_QUEUE = "controlQueue";
String topicName = null;
int exitResult = 0;
public static ConnectionFactory getJmsConnectionFactory()
throws JMSException {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
return new ActiveMQConnectionFactory(user, password, url);
}
public class AsynchSubscriber extends Thread {
private class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor = new SampleUtilities.DoneLatch();
//若收到消息onMessage被调用(实现了MessageListener代理)
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("订阅者线程:读取消息: " + msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): " + e.toString());
}
} else {
//收到非TextMessage类型的消息,Publisher指示你可以结束订阅
monitor.allDone();
}
}
}//结束TextListener Class的定义
/**
* Runs the thread.
*/
public void run() {
ConnectionFactory topicConnectionFactory = null;
Connection topicConnection = null;
Session topicSession = null;
Topic topic = null;
MessageConsumer topicSubscriber = null;
TextListener topicListener = null;
try {
topicConnectionFactory = AsynchTopicExample.getJmsConnectionFactory();
topicConnection = topicConnectionFactory.createConnection();
topicSession = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(topicName);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
topicSubscriber = topicSession.createConsumer(topic);
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
/*
* Asynchronously process messages.
* Block until publisher issues a control message indicating
* end of publish stream.
*/
topicListener.monitor.waitTillDone();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The MultiplePublisher class publishes several message to a topic.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class MultiplePublisher extends Thread {
/**
* Runs the thread.
*/
public void run() {
ConnectionFactory topicConnectionFactory = null;
Connection topicConnection = null;
Session topicSession = null;
Topic topic = null;
MessageProducer topicPublisher = null;
TextMessage message = null;
final int NUMMSGS = 20;
final String MSG_TEXT = new String("Here is a message");
try {
topicConnectionFactory = AsynchTopicExample.getJmsConnectionFactory();
topicConnection = topicConnectionFactory.createConnection();
topicSession = topicConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(topicName);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
try {
topicPublisher = topicSession.createProducer(topic);
message = topicSession.createTextMessage();
for (int i = 0; i < NUMMSGS; i++) {
message.setText(MSG_TEXT + " " + (i + 1));
System.out.println("发布者: 发布消息: " + message.getText());
topicPublisher.send(message);
}
// Send a non-text control message indicating end of messages.
topicPublisher.send(topicSession.createMessage());
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
public void run_threads() {
AsynchSubscriber asynchSubscriber = new AsynchSubscriber();
MultiplePublisher multiplePublisher = new MultiplePublisher();
multiplePublisher.start();
asynchSubscriber.start();
try {
asynchSubscriber.join();
multiplePublisher.join();
} catch (InterruptedException e) {}
}
public static void main(String[] args) {
AsynchTopicExample ate = new AsynchTopicExample();
ate.topicName = "MyFirstTopic";
System.out.println("Topic name is " + ate.topicName);
ate.run_threads();
}
}
参考资料
[1]《Active MQ入门(转) 》
http://yeweiyun868.blog.163.com/blog/static/563784432010112301916556/
[2]《Eclipse中添加Src和JavaDoc的方法》
http://hi.baidu.com/rebeccacao/blog/item/d3f67ed3af8384229b5027bf.html
[3]Active MQ 官网
http://activemq.apache.org/
[4]《Java Message Service Specification》含JMS1.1标准示例代码
http://www.oracle.com/technetwork/java/docs-136352.html
[5]《JMS Example: Publish and Subscribe》
http://jmsexample.zcage.com/index2.html