Rocketmq特性

概念

producer:消息生产者,负责产生消息,一般由业务系统负责生产消息。

Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费

Topic:消息主题,负责标记一类消息,生产者将消息发送到Topic,消费者从该Topic消费消息

Broker:消息中转角色,负责存储消息,转发消息,一般也称为 Server,在 JMS 规范中称为 Provider

NameServer:服务发现Server,用于生产者和消费者获取Broker的服务;

Rocketmq模块划分

名称 作用
broker broker模块:c和p端消息存储逻辑
client 客户端api:produce、consumer端 接受与发送api
common 公共组件:常量、基类、数据结构
tools 运维tools:命令行工具模块
store 存储模块:消息、索引、commitlog存储
namesrv 服务管理模块:服务注册topic等信息存储
remoting 远程通讯模块:netty+fastjson
logappender 日志适配模块
example Demo列子
filtersrv 消息过滤器模块
srvutil 辅助模块
filter 过滤模块:消息过滤模块
distribution 部署、运维相关zip包中的代码
openmessaging 兼容openmessaging分布式消息模块

Rocketmq高可用

“” 发送消息 发送消息过程中 接收消费消息
停用一个namesrv 不影响通讯 不影响通讯 不影响通讯
停用全部 namesrv 影响通讯 不影响通讯 不影响通讯
通用单个master broker 不影响通讯 不影响通讯 不影响通讯
停用全部master borker 影响通讯 影响通讯,无法恢复 影响通讯
停用任意slave broker 不影响通讯 不影响通讯 不影响通讯
恢复任意master borker 不影响通讯 影响通讯,数秒恢复 不影响通讯,数秒恢复

Producer端

发送方式:

Sync:同步的发送方式,会等待发送结果后返回

Async:异步的发送方式,发送完后,立刻返回结果,client在拿到broker的响应有返回,会回调指定的callback,这个api可以指定timeout,不指定也是 默认3000ms

oneway:发出后不管啥样的结果直接返回

发送结果:

org.apache.rocketmq.client.producer.SendStatus

SEND_OK,:消息发送成功

FLUSH_DISK_TIMEOUT,消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

FLUSH_SLAVE_TIMEOUT,消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

SLAVE_NOT_AVAILABLE,消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢

普通消息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

1、准备工作 mesasge、网络相关、线程相关

2、从namesrv获取topic路由(缓存机制)

3、组装数据,broker需要的序列化数据(json)

4、Netty发送(源码)

定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。(第三方 job 步长)

固定精度:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

顺序消息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

场景:订单》下单》支付》配送》签收

底层原理:4个队列,一个订单下面不同状态的消息是顺序的只需要发到一个队列中

org.apache.rocketmq.client.producer.MessageQueueSelector如何选择一个队列

Rocketmq特性

事务消息

Rocketmq特性

Rocketmq特性

Consumer端:

消费模型:

org.apache.rocketmq.common.protocol.heartbeat.MessageModel#BROADCASTING

org.apache.rocketmq.common.protocol.heartbeat.MessageModel#CLUSTERING

消费选择:

org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_LAST_OFFSET
第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费

org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_FIRST_OFFSET
第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费

org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_TIMESTAMP
第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费

以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

消息重复幂等:

RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重
Ps:见开发文档