ActiveMQ(点对点使用)
**
ActiveMQ的点对点使用
**
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝的确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置
为 true。
**
一、创建消息生产者
**
先导入Jar包
代码:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session = null;//会话
Destination destination;//消息目的地 就是一个消息队列
MessageProducer messageProducer = null;//消息生产者
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
//1、连接
connection = connectionFactory.createConnection();//通过工厂获取连接
connection.start();//启动连接
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("短信发送T");
messageProducer = session.createProducer(destination);//创建消息生产者
//2、发送消息
for(int i=0;i<10;i++){
String text = "123456:13800001111"+i;
TextMessage msg = session.createTextMessage(text);
messageProducer.send(destination,msg);
System.out.println("发送到MQ:"+i);
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("关闭");
//3.断开
try{
messageProducer.close();
session.close();
connection.close();
}catch (Exception e){
}
}
}
}
生产者循环10次发送了10条消息,启动之后可以在控制台看到消息队列里有10条待接收的消息。
**
二、消息消费者
**
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session = null;//会话
Destination destination;//消息目的地 就是一个消息队列
MessageConsumer messageConsumer = null;//消息消费者
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
//1、连接MQ
connection = connectionFactory.createConnection();//通过工厂获取连接
connection.start(); //启动连接
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("短信发送T"); //这里的队列的名字必须与生产者的队列名字一样
messageConsumer = session.createConsumer(destination);//创建消息消费者
//2、接收消息
for(int i=0;i<5;i++){
TextMessage textMessage = (TextMessage) messageConsumer.receive();
System.out.println(textMessage.getText());
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("关闭");
//3.断开
try{
messageConsumer.close();
session.close();
connection.close();
}catch (Exception e){
}
}
}
}
消费者启动之后循环了5次,接收了5条消息。启动之后可以在在控制台发现,已经消费了5条消息。
**
三、消息监听器
**
重写消息消费者:在消息接收的时候调用监听类
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session = null;//会话
Destination destination;//消息目的地 就是一个消息队列
MessageConsumer messageConsumer = null;//消息消费者
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
//1、连接MQ
connection = connectionFactory.createConnection();//通过工厂获取连接
connection.start(); //启动连接
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("短信发送T"); //这里的队列的名字必须与生产者的队列名字一样
messageConsumer = session.createConsumer(destination);//创建消息消费者
//2、消息监听器
messageConsumer.setMessageListener(new MyMessageListener()); //调用监听类
}catch (Exception e){
e.printStackTrace();
}
}
}
监听类:
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage)message;
System.out.println("从MQ获取的消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
将接收到的消息输出
同时监听收到的消息,不会减少消息生产者的待监听消息数量