RocketMQ 源码分析 05 消息存储
Broker 接收到生产者发送消息请求后如何存储在 Broker 上
核心实现类DefaultMessageStore
先看核心方法putMessage
1.校验不能是slave,只允许写master
2.校验message store是可写的
3.校验mq的topic长度不能超过最大限制
4.校验mq属性不能过长
5.检测操作系统页写入是否繁忙
6.将日志写入CommitLog 文件,具体实现类 CommitLog
7.记录相关统计信息
8.记录写commitlog 失败次数
CommitLog.putMessage 核心实现
通过mappedFile写入文件
1.初始化 FileChannel、mappedByteBuffer 等。
2.获取当前写入位置。
3.根据消息类型,是批量消息还是单个消息,进入相应的处理。
4.消息写入实现。
5.创建msgId
6.根据 topic-queryId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量为 0。
7.对事务消息需要单独特殊的处理(PREPARE,ROLLBACK类型的消息,不进入Consume队列)。
8.消息的附加属性长度不能超过65536个字节。
9. 计算消息存储长度,消息存储格式:
10 .如果消息长度超过配置的消息总长度,则返回 MESSAGE_SIZE_EXCEEDED。
11. 如果该 MapperFile 中可剩余空间小于当前消息存储空间,返回END_OF_FILE。
12. 将消息写入MapperFile中(内存中)。
13. 重点讲解一下AppendMessageResult方法。
消息刷盘的实现,分成同步刷盘,异步刷盘
1.同步刷盘实现
从上面的代码看,我们可以把 doCommit 方法当成业务方法,在 run 方法的循环被调用,每执行完一次 doCommit 等待10毫秒,这也是 waitForRunning 的核心逻辑,doCommit 中的任务是通过调用如下方法:
10ms执行一次提交doCommit
刷盘具体实现:MappedFileQueue。
1. 根据上次刷新的位置,得到当前的 MappedFile 对象。
2. 执行 MappedFile 的 flush 方法。
3.更新上次刷新的位置。
刷写的实现逻辑就是调用 FileChannel 或 MappedByteBuffer 的force 方法
根据上次刷盘偏移量,找到当前待刷盘mappedFile对象
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
异步刷盘
2.2 异步刷盘
异步刷盘机制,实现原理很简单,就是按照配置的周期定时提交信息到 MappedFile,定时刷写到磁盘,我们重点关注如下几个配置项。
1)消息追加,也就是将消息追加到 CommitLog 文件对应的内存映射区(本过程是加锁的,非并发;2)刷盘阶段(并发)就是将内存区数据刷写到磁盘文件(支持同步、异步刷盘);3)主从同步处理(并发)。