ActiveMQ (二) JMS入门
ActiveMQ (二) JMS入门
JMS入门
前提:安装好了ActiveMQ ActiveMQ安装
Demo结构:
首先pom.xml引入依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
点对点模式:
消息生产者QueueProducer:
package com.zy.demo.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 点对点模式 生产者 */ public class QueueProducer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 MessageProducer producer = session.createProducer(queue); //7.创建消息 TextMessage textMessage = session.createTextMessage("点对点模式"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
消息消费者QueueConsumer:
package com.zy.demo.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 点对点模式 消费者 */ public class QueueConsumer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(queue); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到消息:" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
发布订阅模式:
消息生产者TopicProducer:
package com.zy.demo.toptic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 发布/订阅模式 生产者 */ public class TopicProducer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("发布订阅模式"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
消息消费者TopicConsumer:
package com.zy.demo.toptic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicConsumer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到消息:" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }