ActiveMQ

一:基本概念

ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。

二:基本组件

Broker,消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。

Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker。

Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。

Topic,主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播。

Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。

Message,消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。

三:连接器

ActiveMQ Broker 的主要作用是为客户端应用提供一种通信机制,为此 ActiveMQ 提供了一种连接机制,并用连接器(connector)来描述这种连接机制。ActiveMQ 中连接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通信的传输连接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通信的网络连接器(network connector)。

1、传输连接器:

传输连接器为了交换消息,消息生产者和消息消费者(统称为客户端)都需要连接到消息代理服务器,这种客户端和消息代理服务器之间的通信就是通过传输连接器(Transport connectors)完成的。很多情况下用户连接消息代理时的需求侧重点不同,有的更关注性能,有的更注重安全性,因此 ActiveMQ 提供了一系列l连接协议供选择,来覆盖这些使用场景。从消息代理的角度看,传输连接器就是用来处理和监听客户端连接的,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml),传输连接的相关配置如下:

ActiveMQ

 

传输连接器定义在<transportConnectors>元素中,一个<transportConnector>元素定义一个特定的连接器,一个连接器必须有自己唯一的名字和URI属性,目前在 ActiveMQ 最新的5.15版本中常用的传输连接器连接协议有:vm、tcp、udp、nio、ssl、http、https、websocket、amqp 等等。

Vm:允许客户端和消息服务器直接在 VM 内部通信,采用的连接不是 Socket 连接,而是直接的虚拟机本地方法调用,从而避免网络传输的开销。应用场景仅限于服务器和客户端在同一 JVM 中。

Tcp:客户端通过 TCP 连接到远程的消息服务器。

Udp:客户端通过 UDP 连接到远程的消息服务器。

Nio:nio 和 tcp 的作用是一样的,只不过 nio 使用了 java 的 NIO包,这可能在某些场景下可提供更好的性能。

Ssl:ssl 允许用户在 TCP 的基础上使用 SSL。

http和https:允许客户端使用 REST 或 Ajax 的方式进行连接,这意味着可以直接使用 Javascript 向 ActiveMQ 发送消息。

Websocket:允许客户端通过 HTML5 中的 WebSocket 方式连接到消息服务器。

2、网络连接器:

网络连接器很多情况下,我们要处理的数据可能是海量的,这种场景单台服务器很难支撑,这就要用到集群功能,为此 ActiveMQ 提供了网络连接的模式,简单说就是通过把多个消息服务器实例连接在一起作为一个整体对外提供服务,从而提高整体对外的消息服务能力。通过这种方式连接在一起的服务器实例之间可共享队列和消费者列表,从而达到分布式队列的目的,网络连接器就是用来配置服务器之间的通信。

ActiveMQ

如图所示,服务器 S1 和 S2 通过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 都可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也可以接收到。要使用网络连接器的功能需要在服务器 S1 的 activemq.xml 中的 broker 节点下添加如下配置(假设192.168.11.23:61617 为 S2 的地址):

<networkConnectors>
     <networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>

如果只是这样,S1 可以将消息发送到 S2,但这只是单方向的通信,发送到 S2 上的的消息还不能发送到 S1 上。如果想 S1 也收到从 S2 发来的消息需要在 S2 的 activemq.xml 中的 broker 节点下也添加如下配置(假设192.168.11.45:61617为 S1 的地址):

<networkConnectors>

     <networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>

这样,S1和S2就可以双向通信了。目前在 ActiveMQ 最新的5.15版本中常用的网络连接器协议有 static 和 multicast 两种。

四:p2p和pub/sub的区别

P2P (点对点)消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。

Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。

多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。

ActiveMQ

Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。

除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。

ActiveMQ

五:应用场景

1、异步处理:

好处:正确使用消息中间件将非核心业务逻辑功能异步处理,可以提高系统的响应效率,提高了CPU的吞吐量,改善用户的体验。

(1)、在传统的系统架构,用户从注册到跳转成功页面,中间需要等待邮件发送的业务逻辑耗时。这不仅影响系统响应时间,降低了CPU吞吐量,同时还影响了用户的体验。
(2)、通过消息中间件将邮件发送的业务逻辑异步处理,用户注册成功后发送数据到消息中间件,再跳转成功页面,邮件发送的逻辑再由订阅该消息中间件的其他系统负责处理。
(3)、消息中间件的读写速度非常的快,其中的耗时可以忽略不计。通过消息中间件可以处理更多的请求。

2、流量削锋:

流量削锋也称限流。在秒杀,抢购的活动中,为了不影响整个系统的正常使用,一般会通过消息中间件做限流,避免流量突增压垮系统,前端页面可以提示"排队等待",即便用户体验很差,也不能让系统垮掉。

好处:限流可以在流量突增的情况下保障系统的稳定。

3、系统耦合

可以让各系统之间耦合性降低,不会因为其他系统的异常影响到自身业务逻辑。各尽其职,比如订单系统只需负责将订单数据持久化到数据库中,仓库系统只需负责更新库存,不会因为仓库系统的原因从而影响到下单的流程。

六:消息存储

JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息常用于发送通知或实时数据,当你比较看重系统性能并且即使丢失一些消息并不影响业务正常运作时可选择非持久化消息。持久化消息被发送到消息服务器后如果当前消息的消费者并没有运行则该消息继续存在,只有等到消息被处理并被消息消费者确认之后,消息才会从消息服务器中删除。

对以上这两种方式 ActiveMQ 都支持,并且还支持通过缓存在内存中的中间状态消息的方式来恢复消息。概括起来看 ActiveMQ 的消息存储有三种:存储到内存、存储到文件、存储到数据库。具体使用上 ActiveMQ 提供了一个插件式的消息存储机制,类似于消息的多点传播,主要实现了如下几种:

1、AMQ,是 ActiveMQ 5.0及以前版本默认的消息存储方式,它是一个基于文件的、支持事务的消息存储解决方案。在此方案下消息本身以日志的形式实现持久化,存放在 Data Log 里。并且还对日志里的消息做了引用索引,方便快速取回消息。

<persistenceAdapter>

        <amqPersistenceAdapter

                directory="${activemq.data}/kahadb"

                syncOnWrite="true"

                indexPageSize="16kb"

                indexMaxBinSize="100"

                maxFileLength="10mb" />

</persistenceAdapter>

2、KahaDB,也是一种基于文件并具有支持事务的消息存储方式,从5.3开始推荐使用 KahaDB 存储消息,它提供了比 AMQ 消息存储更好的可扩展性和可恢复性。

要启用 KahaDB 存储,需要在 activemq.xml 中进行以下配置:

<broker brokerName="broker" persistent="true" useShutdownHook="false">

<persistenceAdapter>

  <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>

   </persistenceAdapter>

</broker>

3、JDBC,基于 JDBC 方式将消息存储在数据库中,将消息存到数据库相对来说比较慢,所以 ActiveMQ 建议结合 journal 来存储,它使用了快速的缓存写入技术,大大提高了性能。

4、内存存储,是指将所有要持久化的消息放到内存中,因为这里没有动态的缓存,所以需要注意设置消息服务器的 JVM 和内存大小。

<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">

        <transportConnectors>

                <transportConnector uri="tcp://localhost:61635"/>

        </transportConnectors>

</broker>

5、LevelDB,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定义的索引代替常用的 BTree 索引,其持久化性能高于 KahaDB,虽然默认的持久化方式还是 KahaDB,但是 LevelDB 将是趋势。在5.9版本还提供了基于 LevelDB 和 Zookeeper 的数据复制方式,作为 Master-Slave 方式的首选数据复制方案。