JMS ActiveMQ
1 JMS消息中间件
2.1 消息中间件介绍
什么是中间件?
redis缓存服务器就是一个中间件。独立于系统之外的一个服务器
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java DatabaseConnectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等实现高性能,高可用,可伸缩和最终一致架构。
使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ, Kafka,MetaMQ,RocketMQ
也叫作消息服务中间件,消息容器!
使用java消息服务解决的问题?
主要是在分布式系统中,通过消息服务系统解决高并发访问的情况处理,提高系统处理效率。
2.2 消息队列应用场景
MQ Message Queue 就是消息队列。
以下介绍消息队列在实际应用中常用的四个使用场景
A. 异步处理,
B. 应用解耦,
C. 流量削锋
D. 消息通讯
2.2.1 异步处理
场景说明:
用户注册后,需要发注册邮件和注册短信。
传统的做法有两种:
1串行的方式;
2.并行方式
(1) 串行方式
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2) 并行方式.
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
小结
如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下
(3) 消息队列方式
用户等待的响应时间55ms,大大提升。
使用消息队列提高响应速度
2.2.2 应用解耦
场景说明:
用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图
传统模式的缺点:
l 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
如何解决以上问题呢?
l 订单系统与库存系统耦合
引入应用消息队列后的方案,如下图:
l 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
l 库存系统订阅下单的消息,采用pub/sub(发布/订阅)的方式,获取下单信息,库存系统根据下单信息,进行库存操作
l 假如在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
2.2.3 流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:
秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个回题,一般需要在应用前端加入消息队列。
模型图
模型图说明
l 可以控制活动的人数
l 可以缓解短时间内高流量压垮应用
l 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
l 秒杀业务根据消息队列中的请求信息,再做后续处理
2.2.4 日志处理
目志处理是指将消息队列用在目志处理中,比如Kafka的应用,解决大量日志传输的问题。
架构简化如下
日志采集客户端,负责目志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责目志数据的接收,存储和转发
目志处理应用:订阅并消费kafka队列中的目志数据
(1) Kafka :接收用户目志的消息队列
(2) Logstash:做目志解析,统一成JSON输出给Elasticsearch
(3) Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能
(4) Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因
ELK平台:
E: Elasticsearch搜索服务
L: Logstash 日志解析
Kibana 可视化视图组件
ELK平台部署参考: 《开源实时日志分析ELK平台部署.docx》
2.2.5 消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊大室等。
(1) 点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
(2) 聊大室通讯:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊大室效果。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。
3 JMS消息服务相关概念
消息队列的JAVAEE规范JMS。JMS (Java MessageService,java消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。
3.1 消息模型
3.1.1 概述
在JMS标准中,有两种消息模型:
(1) P2P (Point to Point) 点对点模型(Queue队列模型)
(2) Publish/Subscribe(Pub/Sub) 发布/订阅模型(Topic主题模型)
3.1.2 P2P 模型
l P2P 模型图
|
|
l 涉及概念
(1) 消息队列(Queue),
(2) 发送者(Sender),
(3) 接收者(Receiver). 消费者。
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
l P2P的特点
(1) 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
(2) 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
(3) 接收者在成功接收消息之后需向队列应答成功
(4) 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。
3.1.3 Pub/Sub模式
l Pub/Sub模式图
l 涉及概念
(1) 主题(Topic),
(2) 发布者(Publisher),
(3) 订阅者(Subscriber)
多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
l Pub/Sub的特点
(1) 每个消息可以有多个消费者
(2) 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者, 它必须创建一个订阅者之后,才能消费发布者的消息
(3) 为了消费消息,订阅者必须保持运行的状态
(4) 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被**(运行),它也能接收到发布者的消息。
(5) 如果希望发送的消息可以不被做任何处理、以及只被一个消息者处或者被多个消费者处理的话,那么可以采用Pub/Sub模型。(群发)
3.1.4 小结
3.2 消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1) 同步
订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻寨;
消费者等待消费: 如果队列/主题中没有消息,那就一直等待。知道有消息消费就结束。
(2) 异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
消费者监听消息。
JNDI:java命名和目录接口是一种标准的]ava命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。
JNDI 在JMS中起到查找和访回发送目标或消息来源的作用。
3.3 JMS编程模型 API
(1) ConnectionFactory
创建connection对象的工厂,针对两种不同的jms消息模型,分别有 QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找 ConnectionFactory对象。
(2) Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic),对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
所以,Destination实际上就是两种类型的对象· Queue、Topic可以通过JNDI来查找 Destination。
(3) Connection
connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。
Connection可以产生一个或多个Session,跟ConnectionFactory一样,Connection 也有两种类型:Queueconnection和TopicConnection
(4) session
session是操作消息的接口。可以通过session创建生产者、消费者、消息等。session 提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession.
(5) 消息的生产者
消息生产者由session创建,并用于将消息发送到Destination.同样,消息生产者分两种类型:QueueSender和TopicPublisher可以调用消息生产者的方法(send或publish 方法)发送消息。
(6) 消息消费者
消息消费者由session创建,用于接收被发送到Destination的消息。两种类型 QueueReceiver和TopicSubscriber可分别通过session的createReceiver(Queue) 或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
(7) MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage 方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
4 消息队列ActiveMQ
一般商用的容器,比如WebLogic,JBoss,都支持JMS标准,开发上很方便。但免费的比如Tomcat,jetty等则需要使用第三方的消息中间件。本部分内容介绍常用的消息中间件(Active MQ,RabbitMQ,Zero MQ,Kafka)以及他们的特点。
4.1 ActiveMQ
ActiveMQ是Apache出品最流行的,能力强劲的开源消息总线。
ActiveMQ是一个完全支持]JMS1.1和j2EE 1 4规范的JMSProvider实现。尽管JMS规范出台己经是很久的事情了,但是JMS在当今的] 2EE应用中间仍然扮演着特殊的地位。
MQ全称为MessageQueue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。
ActiveMQ特性如下.
(1) 多种语言和协议编写客户端。
(2) 语言:Java,C,C++,C#,Ruby,Perl,Python,PHP
(3) 应用协议OpenWire,StompREST,WS Notification,XMPP,AMQP
(4) 完全支持JSM1.1和J2EE 1 4规范(持久化,XA消息,事务)
(5) 对Spring 的支持,ActiveMQ可以很容易内嵌到使用spring的系统里面去,而且也支持Spring2.0及更高版本的特性
(6) 通过了常见J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4的商业服务器上。
(7) 持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
(8) 支持通过JDBC和journal提供高速的消息持久化
(9) 从设计上保证了高性能的集群,客户端-服务器,点对点
(10) 支持Ajax
(11) 支持与Axis的整合
(12) 可以很容易得调用内嵌JMS provider,进行测试
4.2 ActivieMQ 下载安装
点击download,
l 下载:
apache-activemq-5.14.5-bin.zip
l 解压:
apache-activemq-5.14.5\bin\win64\activemq.bat
l 启动访问:
账号密码都是admin
4.2.1 依赖
ilcps_parent 中的pom.xml配置文件以及包含下面的坐标
4.3 Queue 消息发送与接收
4.3.1 Queue消息发送
package cn.itcast.jms.test;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 发送消息到Queue队列中。 * Queue名称是: hello */ publicclass Queue_1_sender {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
//2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start();
//3. 创建session // 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。 // 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器) Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建消息发送的目的地对象 //interface javax.jms.Queue extends javax.jms.Destination Queue queue = session.createQueue("hello");
//5. 创建消息 TextMessage msg = session.createTextMessage(); msg.setText("发送Queue消息到hello队列中.....");
//6. 消息生产者 MessageProducer messageProducer = session.createProducer(queue);
//7. 发送消息 messageProducer.send(msg);
// 关闭 session.close(); conn.close(); } } |
4.3.2 Queue消息消费
4.3.2.1 方式1:调用receive() 方法
package cn.itcast.jms.test;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 消费Queue队列中的消息: * Queue名称是: hello */ publicclass Queue_2_consumer {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
//2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start();
//3. 创建session // 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。 // 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器) Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4. 创建消息发送的目的地对象 //interface javax.jms.Queue extends javax.jms.Destination Queue queue = session.createQueue("hello");
//6. 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue);
//7. 消费消息 (同步) // receive() 调用这个方法,如果容器中没有消息,线程处于阻塞状态。直到有消息才结束当前线程。 // receive(5000) 从容器中如果没有拿到消息的等待时间 5秒 Message message = messageConsumer.receive(5000); if (message != null) { TextMessage msg = (TextMessage) message; System.out.println("----->" + msg.getText()); }
// 提交事务、关闭 session.commit(); session.close(); conn.close(); } } |
|
4.3.2.2 方式2:监听器方式消费消息
package cn.itcast.jms.test;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 消费Queue队列中的消息: * Queue名称是: hello */ publicclass Queue_2_consumer2_listener {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
//2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start();
//3. 创建session // 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。 // 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器) Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4. 创建消息发送的目的地对象 //interface javax.jms.Queue extends javax.jms.Destination Queue queue = session.createQueue("hello");
//6. 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue);
//7. 消费消息 (监听器消费消息,异步) messageConsumer.setMessageListener(new MessageListener() { publicvoid onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println(msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
// 保持监听器的运行 while(true){}
/* 提交事务、关闭 session.commit(); session.close(); conn.close(); */ } } |
4.3.3 Topic 消息发送与接收
4.3.4 Topic消息发送
/** * 发送消息到Topic主题消息中 */ publicclass Topic_1_sender {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); //2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start(); //3. 创建session Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. 创建消息发送的目的地对象 Topic topic = session.createTopic("hello2");
//5. 创建消息 TextMessage msg = session.createTextMessage(); msg.setText("发送Topic消息到hello2队列中.....");
//6. 消息生产者 MessageProducer messageProducer = session.createProducer(topic);
//7. 发送消息 messageProducer.send(msg);
// 关闭 session.close(); conn.close(); } } |
4.3.5 Topic消息接收
/** * 消费消息 */ publicclass Topic_2_consumer {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); //2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start(); //3. 创建session Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. 创建消息发送的目的地对象 Topic topic = session.createTopic("hello2");
//5. 创建消息消费者 MessageConsumer messageConsumer = session.createConsumer(topic);
//6. 消费消息 messageConsumer.setMessageListener(new MessageListener() {
@Override publicvoid onMessage(Message message) { try { TextMessage msg = (TextMessage) message; System.out.println("消费消息成功---->" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
// 保持监听器运行 while(true){} } } |
4.4 多个消费者测试
4.4.1 队列消息
l 需求:有10个Queue队列消息,2个消费者,
l 每个消费者消费多少个消息?
5个。 消息容器会确保消息平均分配给每个消费者。
1.发送10个消息 |
/** * 发送多个消息测试 */ publicclass Queue_1_sender {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
//2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start();
//3. 创建session // 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。 // 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器) Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建消息发送的目的地对象 //interface javax.jms.Queue extends javax.jms.Destination Queue queue = session.createQueue("hello");
//6. 消息生产者 MessageProducer messageProducer = session.createProducer(queue);
//7. 发送消息 for (inti=1; i<=10; i++) { MapMessage msg = session.createMapMessage(); msg.setString("email", i+"[email protected]"); msg.setString("phone", i+"18665591009"); // 发送多个消息 messageProducer.send(msg); }
// 关闭 session.close(); conn.close(); } } |
2.消费者1 |
/** * 消费者1 */ publicclass Queue_2_consumer1 {
// 发送消息 publicstaticvoid main(String[] args) throws Exception { //1. 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
//2. 创建连接 Connection conn = connectionFactory.createConnection(); // 开启连接 conn.start();
//3. 创建session // 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。 // 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器) Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建消息发送的目的地对象 //interface javax.jms.Queue extends javax.jms.Destination Queue queue = session.createQueue("hello");
//6. 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue);
//7. 消费消息 (监听器消费消息,异步) messageConsumer.setMessageListener(new MessageListener() { publicvoid onMessage(Message message) { MapMessage msg = (MapMessage) message; try { String email = msg.getString("email"); String phone = msg.getString("phone"); System.out.println(email + "," + phone); } catch (JMSException e) { e.printStackTrace(); } } });
// 保持监听器的运行 while(true){}
} } |
3. 消费者2 |
同消费者1代码一样。 |
4. 测试 (先运行消费者) |
4.4.2 主题消息
l 需求:有10个Topic主题消息,2个消费者,
l 每个消费者消费多少个消息?
10个。 Topic主题消息特点,一个消息可以被多个消费者消费。
1.发送10个消息 |
|
2.消费者1 |
|
3. 消费者2 |
|
4. 测试 (先运行消费者) |
|
5 Spring整合ActiveMQ (重要)
5.1 整合配置,实现发送消息
5.1.1 发送消息配置
l 关键点
n 创建ActiveMQ连接工厂
n 创建缓存工厂
n 创建JmsTemplate
l 配置实现
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Spring整合ActiveMQ,实现消息发送 1. 创建ActiveMQ连接工厂 2. 创建缓存工厂 3. 创建JmsTemplate --> <!-- 1. 创建ActiveMQ连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>
<!-- 2. 创建缓存工厂 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 注入连接工厂--> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- session缓存数目 --> <property name="sessionCacheSize" value="5"></property> </bean>
<!-- 3. 创建JmsTemplate(发送消息的模板工具类对象) -->
<!-- 3.1发送Queue队列消息 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入缓存工厂 --> <property name="connectionFactory" ref="cachingConnectionFactory"></property> <!-- 默认值 --> <property name="pubSubDomain" value="false"></property> </bean>
<!-- 3.2发送Topic主题消息 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 注入缓存工厂 --> <property name="connectionFactory" ref="cachingConnectionFactory"></property> <!-- 设置消息模型为主题消息 --> <property name="pubSubDomain" value="true"></property> </bean> </beans> |
5.1.2 发送消息代码
l 通过JmsTemplate,实现消息发送
package cn.itcast.jms.test3_spring;
import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/** * Spring整合ActiveMQ发送消息 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-mq-send.xml") publicclass Sender {
// 注入发送Queue、Topic消息的模板对象 @Resource private JmsTemplate jmsQueueTemplate; @Resource private JmsTemplate jmsTopicTemplate;
@Test publicvoid sender() throws Exception { // 发送Queue队列消息 jmsQueueTemplate.send("email", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("email", "[email protected]"); returnmapMessage; } });
// 发送Topic主题消息 jmsTopicTemplate.send("phone", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("phone", "18665591009"); returnmapMessage; } });
} } |
|
5.2 整合配置,实现消费消息
5.2.1 创建ilcps_jms 消息处理系统
l 继承父项目
5.2.2 写监听器类
/** * 消息消费监听器,监听Queue队列中的email信息 * @author Administrator * */ publicclass EmailListener implements MessageListener{
@Override publicvoid onMessage(Message msg) { try { // 1. 转换 MapMessage mapMessage = (MapMessage) msg;
// 2. 根据key,获取消息中的数据 String email = mapMessage.getString("email");
// 3. 业务处理(发邮件) System.out.println("消息处理成功--->" + email); } catch (JMSException e) { e.printStackTrace(); } }
}
|
/** * 消息消费监听器,监听Topic主题中的phone信息 * @author Administrator * */ publicclass PhoneListener implements MessageListener{
@Override publicvoid onMessage(Message msg) { try { // 1. 转换 MapMessage mapMessage = (MapMessage) msg;
// 2. 根据key,获取消息中的数据 String phone = mapMessage.getString("phone");
// 3. 业务处理(发短信) System.out.println("消息处理成功--->" + phone); } catch (JMSException e) { e.printStackTrace(); } }
}
|
5.2.3 Spring整合ActiveMQ 消费消息配置
5.2.3.1 参考
<!-- 配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。 jms:listener-container destination-type 监听的JMS消息类型(queue、topic) connection-factory Spring的缓存连接工厂 jms:listener destination 对应MQ中队列名称或主题名称 rel 消息监听器类(实现MessageListener接口) --> <jms:listener-container destination-type="queue" connection-factory="cachingFactory"> <jms:listener destination="love" ref=""/> </jms:listener-container>
|
l 2. 配置说明
属性 | 描述 |
container-type | 监听器容器的类型。可用的选项是: default、simple、default102 或者 simple102 (默认值是 'default')。 |
connection-factory | JMS ConnectionFactory Bean的引用(默认的Bean名称是 'connectionFactory')。 |
task-executor | JMS监听器调用者Spring TaskExecutor 的引用。 |
destination-resolver | DestinationResolver 策略的引用,用以解析JMS Destinations。 |
message-converter | MessageConverter 策略的引用,用以转换JMS Messages 成监听器方法的参数。默认值是 SimpleMessageConverter。 |
destination-type | 监听器的JMS目的地类型。可用的选项包含: queue、topic 或者 durableTopic (默认值是 'queue')。 |
client-id | 这个监听器容器在JMS客户端的id。 |
acknowledge | 本地JMS应答模式。可用的选项包含: auto、client、dups-ok 或者 transacted (默认值是 'auto')。 'transacted' 的值可**本地事务性 Session。 也可以通过指定下面介绍的 transaction-manager 属性。 |
transaction-manager | Spring PlatformTransactionManager 的引用。 |
concurrency | 每个监听器可**的Session最大并发数。 |
prefetch | 加载进每个Session的最大消息数。记住增加这个值会造成并发空闲。 |
5.2.3.2 完整配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Spring整合ActiveMQ,实现消息消费 1. 创建ActiveMQ连接工厂 2. 创建缓存工厂 3. 创建监听器 --> <!-- 1. 创建ActiveMQ连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>
<!-- 2. 创建缓存工厂 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 注入连接工厂--> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- session缓存数目 --> <property name="sessionCacheSize" value="5"></property> </bean>
<!-- 3. 监听器配置 --> <!-- 创建监听器对象 --> <bean id="emailListener" class="cn.itcast.listener.EmailListener"></bean> <bean id="phoneListener" class="cn.itcast.listener.PhoneListener"></bean>
<!-- 配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。 jms:listener-container destination-type 监听的JMS消息类型(queue、topic) connection-factory Spring的缓存连接工厂 jms:listener destination 对应MQ中队列名称或主题名称 rel 消息监听器类(实现MessageListener接口) -->
<!-- 3.1 监听指定名称(email)的队列中的消息--> <jms:listener-container destination-type="queue" connection-factory="cachingConnectionFactory"> <jms:listener destination="email" ref="emailListener"/> </jms:listener-container>
<!-- 3.2 监听指定名称(email)的主题中的消息 --> <jms:listener-container destination-type="topic" connection-factory="cachingConnectionFactory"> <jms:listener destination="phone" ref="phoneListener"/> </jms:listener-container> </beans> |
|
5.2.4 消费消息测试
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-mq-send.xml") publicclass ConsumeApp {
// 保持监听器运行 @Test publicvoid run() throws Exception { while(true){} } } |
6 小结
(1) Jms Java 消息服务
作用: 在分布式系统中,通过java的消息服务,把一些占用系统资源的独立的业务抽取出去,通过消息处理系统完成这块业务的实现。从而提供系统的处理能力。
(2) 核心概念
a) 消息模型
Queue 队列消息模型
Topic 主题消息模型
b) 消息消费的2种方式
i. receive()
ii. MessageListener()
c) Jms编程模型(Api)
包: javax.jms.*
i. ConnectionFactroy
ii. Destination
iii. Session
创建生产者、消费者、目的地、消息
iv. Connection
v. MessageProducer
vi. MessageConsumer
vii. Message
(3) 发送消费Queue队列消息
(4) 发送消费Topic主题消息
(5) 测试:多个消费者
(6) Spring整合ActiveMQ
a) 发消息
i. 配置
ConnectionFactory、CachingConnectionFactory、JmsTemplate
ii. 测试
b) 消费消息
i. 写监听器(获取消息Message内容)
ii. 配置
<jms:listener-container>
iii. 测试
启动tomcat服务器,监听器运行,就可以监听容器中消息。