RocketMQ

RocketMQ知识点

1、简介

(1)典型的消息中间件收发消息的模型

RocketMQ

(2)RocketMQ是阿里推出的一款开源的消息中间件。其前身是MetaQ,全名是Metamorphosis。它并不遵循任何规范(比如JMS、AMQP等),但是参考了各种规范与同类产品的设计思想,主要借鉴的产品是Apache Kafka。

2、特性

(1)它是一个队列模型的消息中间件,具有高性能、高可靠、高实时、低延时、分布式、支持异步、同步双写特点。

(2)灵活可扩展性

RocketMQ 天然支持集群,其核心四组件(Name Server、Broker、Producer、Consumer)每一个都可以在没有单点故障的情况下进行水平扩展。

(3)能够保证严格的消息顺序

可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。

(4)多种消息过滤方式

消息过滤分为在服务器端过滤和在消费端过滤。服务器端过滤时可以按照消息消费者的要求(支持topic类型、tag、语法表达式【filter的方式等】过滤)做过滤,优点是减少不必要消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,这种方式更加灵活,缺点是很多无用的消息会传输给消息消费者。

(5)海量消息堆积能力(亿级)

RocketMQ 采用零拷贝原理实现超大的消息的堆积能力,据说单机已可以支持亿级消息堆积,而且在堆积了这么多消息后依然保持写入低延迟。

(6)支持事务消息

RocketMQ 除了支持普通消息,顺序消息之外还支持事务消息(根据offset更改msg状态),这个特性对于分布式事务来说提供了又一种解决思路。

(7)回溯消费(独有)

回溯消费是指消费者已经消费成功的消息,由于业务上需求需要重新消费,RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

(8)定时消息(重点)

在发送消息是也可以针对message设置setDelayTimeLevel,支持级别:5s,5s,1m。

(9)实时的消息发布/订阅机制

(10)提供丰富的消息拉取模式

使用长轮询Pull方式长轮询详解,可保证消息非常实时,消息实时性不低于Push。

(11)较少的依赖

(12)支持队列优先级,数字表达

(13)持久化

充分利用linux系统内存cache提升性能。

(14)消息失败重试机制

针对provider的重试,当消息发送到选定的broker时如果出现失败会自动选择其他的broker进行重发,默认重试三次,当然重试次数要在消息发送的超时时间范围内。

针对consumer的重试,如果消息因为各种原因没有消费成功,会自动加入到重试队列,一般情况如果是因为网络等问题连续重试也是照样失败,所以rocketmq也是采用阶梯重试的方式。

3、基础概念

生产者

生产者(Producer)负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。 RocketMQ 提供了三种方式发送消息:同步、异步和单向。

同步发送

同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。

异步发送

异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。

单向发送

单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

生产者组

生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群。

消费者

消费者(Consumer)负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。站在用户应用的角度消费者有两种类型:拉取型消费者、推送型消费者。

拉取型消费者

拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。

推送型消费者

推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

消费者组

消费者组(Consumer Group)一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起。消费者组与生产者组类似,都是将相同角色的分组在一起并命名,分组是个很精妙的概念设计,RocketMQ 正是通过这种分组机制,实现了天然的消息负载均衡。消费消息时通过 Consumer Group 实现了将消息分发到多个消费者服务器实例,比如某个 Topic 有9条消息,其中一个 Consumer Group 有3个实例(3个进程或3台机器),那么每个实例将均摊3条消息,这也意味着我们可以很方便的通过加机器来实现水平扩展。

消息服务器

消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 不可以写只可以读。从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master多 Slave(异步刷盘)。

单 Master

这种方式一旦 Broker 重启或宕机会导致整个服务不可用,这种方式风险较大,所以显然不建议线上环境使用。

多 Master

所有消息服务器都是 Master ,没有 Slave 。这种方式优点是配置简单,单个 Master 宕机或重启维护对应用无影响。缺点是单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受影响。

多 Master 多 Slave(异步复制)

每个 Master 配置一个 Slave,所以有多对 Master-Slave,消息采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优点是消息丢失的非常少,且消息实时性不会受影响,Master 宕机后消费者可以继续从 Slave 消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多 Master 方式几乎一样。缺点是 Master 宕机时在磁盘损坏情况下会丢失极少量消息。

多 Master 多 Slave(同步双写)

每个 Master 配置一个 Slave,所以有多对 Master-Slave ,消息采用同步双写方式,主备都写成功才返回成功。这种方式优点是数据与服务都没有单点问题,Master 宕机时消息无延迟,服务与数据的可用性非常高。缺点是性能相对异步复制方式略低,发送消息的延迟会略高。

名称服务器

名称服务器(NameServer)用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找 Broker 信息。NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的 ZooKeeper ,后来改为了自己实现的 NameServer 。

消息

消息(Message)就是要传输的信息。一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 key 并在 Broker 上查找此消息以便在开发期间查找问题。

主题

主题(Topic)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。

标签

标签(Tag)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

消息队列

消息队列(Message Queue),主题被划分为一个或多个子主题,即消息队列。一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。

消息消费模式

消息消费模式有两种:集群消费(Clustering)和广播消费(Broadcasting)。默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。而广播消费消息会发给消费者组中的每一个消费者进行消费。

消息顺序

消息顺序(Message Order)有两种:顺序消费(Orderly)和并行消费(Concurrently)。顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

4、消息生命周期

对于任意一个消息中间件来说,消息都需要经过消息生产-消息存储-消息消费三个过程。其中消息生产包括了Producer端的消息构造和消息发送,消息存储包括了Broker端的消息接收和消息落地,消息消费包括了Consumer端的消息接收和消息处理。不同消息中间件之所以功能不同,性能差异较大,其主要原因就是消息生命周期中各个阶段的处理方式不同。下面介绍一下RocketMQ在各个阶段的处理方式。

消息生产

消息的生产者Producer在发送消息时,会从Name Server上获取消息的目的地(Topic)在各个Broker上的状态,如果发现同一个Broker下的Topic有多个Queue(队列),则会根据RoundBin算法依次向每个Queue发送消息,此外,如果发现多个Broker上均有相同Topic,也会依照轮询的方式依次向这些Broker发送消息。

消息存储

RocketMQ的消息存储是由consume queue和commit log配合完成的。

consume queue是消息的逻辑队列,相当于字典的目录,用于指定消息在物理文件commit log中的位置。每一个queue都有一个对应的consume queue文件。consume queue中存放的是一串20字节定长的二进制数据,顺序写顺序读。
Commit Log是消息存放的物理文件,每台Broker上的Commit Log被本机所有的queue共享,不做任何区分。CommitLog中消息存储单元长度不固定,文件顺序写,随机读。

简而言之,Broker端在收到一条消息后,如果是消息需要落盘,则会在Commit Log中写入整条消息,并在consume queue中写入该消息的索引信息。消息被消费时,则根据consume queue中的信息去Commit Log中获取消息。RocketMQ在消息被消费后,并不会去Commit Log中删除消息,而是会保存3天(可配置)而后批量删除。

RocketMQ支持同步刷盘及异步刷盘两种模式,同步刷盘指的是Producer将消息发送至Broker后,等待消息刷入Commit Log和consume queue后才算消息发送成功,而异步刷盘则是将消息发送至Broker后,Broker将消息放入内存则告知Producer消息发送成功,而后由Broker自行将内存中的消息批量刷入磁盘。

消息消费

RocketMQ消息订阅有两种模式,一种是Push模式,即Broker主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到Broker拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

与Kafka类似,Kafka中Consumer数量不能大于Partition数量,而在RocketMQ中Consumer的数量也不能大于队列(Queue)的数量,如果Consumer超过队列数量,那么多余的Consumer将不能消费消息。可以简单理解为queue与consumer的关系是多对一的关系。

RocketMQ
Topic & message queue:一个分布式消息队列中间件部署好以后,可以给很多个业务提供服务,同一个业务也有不同类型的消息要投递,这些不同类型的消息以不同的 Topic 名称来区分。所以发送和接收消息前,先创建topic,针对某个 Topic 发送和接收消息。有了 Topic 以后,还需要解决性能问题 。 如果一个Topic 要发送和接收的数据量非常大, 需要能支持增加并行处理的机器来提高处理速度,这时候一个 Topic 可以根据需求设置一个或多个 Message Queue, Message Queue 类似分区或 Partition 。Topic有了多个 Message Queue 后,消息可以并行地向各个Message Queue 发送,消费者也可以并行地从多个 Message Queue 读取消息并消费 。

5、物理部署结构

RocketMQ
RocketMQ使用Name Server集群加Broker集群的方式来搭建。

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Name Server集群:提供topic的路由信息,路由信息数据存储在内存中,broker会定时的发送路由信息到nameserver中的每一个机器,来进行更新,所以name server集群可以简单理解为无状态(实际情况下可能存在每个nameserver机器上的数据有短暂的不一致现象,但是通过定时更新,大部分情况下都是一致的)。

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

Broker集群:一个集群有一个统一的名字,即brokerClusterName,默认是DefaultCluster。一个集群下有多个master,每个master下有多个slave。master和slave算是一组,拥有相同的brokerName,不同的brokerId,master的brokerId是0,而slave则是大于0的值。master和slave之间可以进行同步复制或者是异步复制。

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer集群:拥有相同的ProducerGroup,一般来讲,Producer不必要有集群的概念,这里的集群仅仅在RocketMQ的分布式事务中有用到。

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

consumer集群:拥有相同的consumerGroup。

Message Filter:可以实现高级的自定义的消息过滤。

RocketMQ
分区与队列的区别

为提高消息读写并发能力,将一个topic消息进行拆分,kafka称为分区,rocketmq称为队列。

对于kafka:为了防止一个分区的消息文件过大,会拆分成一个个固定大小的文件,所以一个分区就对应了一个目录,分区与分区之间是相互隔离的。

对于RocketMQ:则是所有topic的数据混在一起进行存储,默认超过1G的话,则重新创建一个新的文件。消息的写入过程即写入该混杂的文件中,然后又有一个线程服务,在不断的读取分析该混杂文件,将消息进行分拣,然后存储在对应队列目录中(存储的是简要信息,如消息在混杂文件中的offset,消息大小等)。所以RocketMQ需要2次寻找,第一次先找队列中的消息概要信息,拿到概要信息中的offset,根据这个offset再到混杂文件中找到想要的消息。而kafka则只需要直接读取分区中的文件即可得到想要的消息。

producer端发现

Producer端如何来发现新的broker地址?

对于kafka来说:Producer端需要配置broker的列表地址,Producer也从一个broker中来更新broker列表地址(从中发现新加入的broker)。

对于RocketMQ来说:Producer端需要Name Server的列表地址,同时还可以定时从一个HTTP地址中来获取最新的Name Server的列表地址,然后从其中的一台Name Server来获取全部的路由信息,从中发现新的broker。

消费offset的存储

对于kafka:Consumer将消费的offset定时存储到ZooKeeper上,利用ZooKeeper保障了offset的高可用问题。

对于RocketMQ:Consumer将消费的offset定时存储到broker所在的机器上,这个broker优先是master,如果master挂了的话,则会选择slave来存储,broker也是将这些offset定时刷新到本地磁盘上,同时slave会定时的访问master来获取这些offset。

负载均衡

对于consumer负载均衡,在出现分区或者队列增加或者减少的时候、Consumer增加或者减少的时候都会进行reblance操作。

对于RocketMQ:客户端自己会定时对所有的topic的进行reblance操作,对于每个topic,会从broker获取所有Consumer列表,从broker获取队列列表,按照负载均衡策略,计算各自负责哪些队列。这种就要求进行负载均衡的时候,各个Consumer获取的数据是一致的,不然不同的Consumer的reblance结果就不同。

对于kafka:kafka之前也是客户端自己进行reblance,依靠ZooKeeper的监听,来监听上述2种情况的出现,一旦出现则进行reblance。现在的版本则将这个reblance操作转移到了broker端来做,不但解决了RocketMQ上述的问题,同时减轻了客户端的操作,客户端更加轻量级,减少了和其它语言集成的工作量。

备注

Name Server和ZooKeeper?

Name Server和ZooKeeper的作用大致是相同的。

Name Server:从宏观上来看,Name Server做的东西很少,就是保存一些运行数据,Name Server之间不互连,这就需要broker端连接所有的Name Server,运行数据的改动要发送到每一个Name Server来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了Name Server很轻量级,但是broker端就要做更多的东西了。

ZooKeeper:broker只需要连接其中的一台机器,运行数据分发、一致性都交给了ZooKeeper来完成。