RocketMQ 源码分析 05 消息存储

 Broker 接收到生产者发送消息请求后如何存储在 Broker 上

 核心实现类DefaultMessageStore

先看核心方法putMessage

RocketMQ 源码分析 05 消息存储

1.校验不能是slave,只允许写master

2.校验message store是可写的

3.校验mq的topic长度不能超过最大限制

4.校验mq属性不能过长

5.检测操作系统页写入是否繁忙

6.将日志写入CommitLog 文件,具体实现类 CommitLog

7.记录相关统计信息

8.记录写commitlog 失败次数

CommitLog.putMessage 核心实现

RocketMQ 源码分析 05 消息存储

通过mappedFile写入文件

1.初始化 FileChannel、mappedByteBuffer 等。

2.获取当前写入位置。

3.根据消息类型,是批量消息还是单个消息,进入相应的处理。

4.消息写入实现。

RocketMQ 源码分析 05 消息存储

5.创建msgId

RocketMQ 源码分析 05 消息存储

6.根据 topic-queryId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量为 0。

7.对事务消息需要单独特殊的处理(PREPARE,ROLLBACK类型的消息,不进入Consume队列)。

8.消息的附加属性长度不能超过65536个字节。

9. 计算消息存储长度,消息存储格式:

10 .如果消息长度超过配置的消息总长度,则返回 MESSAGE_SIZE_EXCEEDED。

11. 如果该 MapperFile 中可剩余空间小于当前消息存储空间,返回END_OF_FILE。

12. 将消息写入MapperFile中(内存中)。

13. 重点讲解一下AppendMessageResult方法。

RocketMQ 源码分析 05 消息存储

消息刷盘的实现,分成同步刷盘,异步刷盘

1.同步刷盘实现

RocketMQ 源码分析 05 消息存储

从上面的代码看,我们可以把 doCommit 方法当成业务方法,在 run 方法的循环被调用,每执行完一次 doCommit 等待10毫秒,这也是 waitForRunning 的核心逻辑,doCommit 中的任务是通过调用如下方法:

10ms执行一次提交doCommit

刷盘具体实现:MappedFileQueue。

1. 根据上次刷新的位置,得到当前的 MappedFile 对象。

2. 执行 MappedFile 的 flush 方法。

3.更新上次刷新的位置。

刷写的实现逻辑就是调用 FileChannel 或 MappedByteBuffer 的force 方法

RocketMQ 源码分析 05 消息存储

根据上次刷盘偏移量,找到当前待刷盘mappedFile对象

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

异步刷盘

2.2 异步刷盘

异步刷盘机制,实现原理很简单,就是按照配置的周期定时提交信息到 MappedFile,定时刷写到磁盘,我们重点关注如下几个配置项。

RocketMQ 源码分析 05 消息存储

1)消息追加,也就是将消息追加到 CommitLog 文件对应的内存映射区(本过程是加锁的,非并发;2)刷盘阶段(并发)就是将内存区数据刷写到磁盘文件(支持同步、异步刷盘);3)主从同步处理(并发)。