消息队列 ActiveMQ·基础篇

一: ActiveMQ介绍

ActiveMQ 是一个 **MOM**,具体来说是一个实现了 **JMS 规范**的**系统间远程通信的消息代理**。

MOM 就是面向消息中间件(Message-oriented middleware),是用于以分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通信的一类软件。**MOM 的总体思想是它作为消息发送器和消息接收器之间的消息中介**,这种中介提供了一个全新水平的松耦合

JMS 即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
消息队列 ActiveMQ·基础篇
二:消息队列

消息队列 全称为Message Queue,消息队列(MQ)是正确而又完整的 JMS 实现,消息队列(MQ)是一种应用程序对应用程序的通信方法。
应用程序,通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过
在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

三:应用场景

  1. 异步处理
 场景说明:新用户注册发放100积分,180元新手大礼包,**会员卡,传统的做法有两种:串行方式,并行方式。 
  • 串行方式
    消息队列 ActiveMQ·基础篇
  • 使用消息队列 并行方式
    消息队列 ActiveMQ·基础篇
以上两种方式,很容易发现同步处理(串行方式)的情况下都会涉及到非主业务的其他操作,整个执行流程大概需要200ms,等待时间过长,
正常逻辑其实注册的的主流程不应该受其他事件影响,通过消息队列的方式,可以把后续的处理流程进行异步处理可以大大提高响应速度,
由原来的200ms变成现在的50ms,后面发放100积分,180元新手大礼包,**会员卡不影响主业务逻辑

2:应用解耦

场景说明: 业务系统之间合作,两套不同企业系统,会员信息对接数据共享,常常的方式都是通过三方接口来实现数据信息交换
  • 传统企业:
根据业务需求修改业务逻辑,来实现数据同步,导致代码相互交叉,耦合度太高,需要花费大量的时间整合,一旦三方接口出现问题影响主业务

消息队列 ActiveMQ·基础篇

  • 消息队列:
1.注册完成然后将消息写入队列返回成功。
2.发放权益业务不影响主业务,实现解耦

消息队列 ActiveMQ·基础篇
四:JMS两种消息传送模式

P2P (点对点)消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。
Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 
MessageListener 实现异步接收。多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 
Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。

消息队列 ActiveMQ·基础篇

Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。
发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。
除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 
Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。

消息队列 ActiveMQ·基础篇
queue消息生产者代码

//创建步骤

//第一步:获取工厂连接
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//第二步:通过工厂创建连接
Connection connection = factory.createConnection();
//开启连接
connection.start();
//创建回话session  
//参数一:是否对事物支持: true  参数二值被忽略  默认值为:SESSION_TRANSACTED 
//Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//参数一:是否对事物支持: false   
//Session.AUTO_ACKNOWLEDGE:为自动确认,客户端发送和接收消息不需要做额外的工作。
//Session.CLIENT_ACKNOWLEDGE:为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。 
//Session.DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,
//							   会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
//SESSION_TRANSACTED
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建会话对象类型
Queue queue = session.createQueue("test-queue");
//使用会话创建生产对象
MessageProducer producer = session.createProducer(queue);
//创建会话消息对象
TextMessage message = session.createTextMessage("我是test-queue测试数据");
//发送消息
producer.send(message);

producer.close();
session.close();
connection.close();

消息队列 ActiveMQ·基础篇
queue消息消费者代码

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test-queue");
MessageConsumer consumer = session.createConsumer(queue);
//接收消息方法一:同步
Message message = consumer.receive();
System.out.println(message);

/*
 接收消息方法二:异步
 consumer.setMessageListener(new MessageListener() {
	
	@Override
	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage) message;
		try {
			System.out.println(textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
});*/
//程序等待接收用户消息
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();

topic消息生产者代码

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");			
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageProducer producer = session.createProducer(topic);
//producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("发布订阅模式!");
producer.send(message);
producer.close();
session.close();
connection.close();

topic消息消费者代码

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageConsumer consumer = session.createConsumer(topic);

consumer.setMessageListener(new MessageListener() {
	@Override
	public void onMessage(Message message) {
		TextMessage text = (TextMessage)message;
		System.out.println(text);
	}
});

System.in.read();
consumer.close();
session.close();
connection.close();