RocketMQ存储文件浅析

最近公司要求将所有的ActiveMQ迁移到RocketMQ,阅读了RocketMQ的官方文档和部分源码,将自己的了解记录一下。

众所周知,RocketMQ分为四个部分,nameServer,broker,producer,consumer。

nameServer是注册中心,负责服务发现,路由信息存储和变更,broker负责存储消息,producer发送消息,consumer消费消息。

本次主要说明broker如何存储和查询消息

broker主要使用三个文件来存储消息,commitLog,consumerQueue,IndexFile

1.commitLog文件

commitLog文件存储消息的几乎所有信息,存储位置如下图

RocketMQ存储文件浅析

直观来看,commitLog文件使用上图目录结构,存储位置:${ROCKET_HOME}/store/commitlog/${fileName}

其中filename是文件的物理偏移量。例如,第一个文件第一条消息的物理偏移量是0,文件名是0000000000000000000,由于每个文件大小固定是1GB,所以第二个文件是00000000001073741824

存储的数据结构如下图:

RocketMQ存储文件浅析

需要注意的是,图中列出了主要的存储信息,省去了部分不那么重要的信息,例如固定值魔数。

可以看出,每一条消息数据的大小并不是固定的,消息元数据占据的大小存储在totlesize中。

2.consumerQueue文件

consumerQueue文件表示topic下的队列。文件目录如下:

RocketMQ存储文件浅析

存储的目录如图所示:${ROCKET_HOME }/store/consumequeue/{topic}/。文件大小是固定的30w * 20字节,其中每条消息20字节,每个文件存储30w条。

单条数据结构如下图:

RocketMQ存储文件浅析

其中,commitLog offset表示本条消息在commitLog中的偏移量,size表示消息大小,message tag hashcode表示tag参数的哈希值,主要用于过滤。可以看出,如果已知topic和队列,可以根据消费者消费进度,使用consumerQueue文件顺序读取消息,然后使用获取到的偏移量从commitLog中快速查找到消息的消息体等重要信息,然后返回到消费者。

RocketMQ还可以根据消息时间戳查询消息,借助consumerQueue完成。因为consumerQueue文件是一个有序的文件,时间戳小的消息一定在时间戳大的消息的前面,所以RocketMQ借助二分查找来完成按照时间戳查找消息的操作。

3.IndexFile文件

RocketMQ除了可以顺序消费消息,还可以消息的key来查询,key查询借助indexFile来完成,indexFile的存储结构如下图:

RocketMQ存储文件浅析

indexFile文件是一个典型的hashtable实现,用链表来解决hash冲突。indexFile文件分为3部分。

第一部分header,其中包含了第一条索引的时间戳(beginTimestamp),最后一条消息的时间戳(endTimeStamp)起始物理偏移量(beginPhyoffset)结束物理偏移量(endPhyoffset)哈希条目已用个数(indexCount)

第二部分slot table,一共有500w个条目,每条大小4字节。表示Index Listed List中数据的偏移量。

第三部分是 Index Listed List,一共2000W条,存储的结构如上图。keyHash表示本条数据所对应Key的hash值,不保存key而保存hash值是为了保证每条数据固定大小;commitLogOffset表示需要查询的消息在commitLog中的偏移量,根据偏移量可以快速查找到commitLog中的消息;timestap表示本条消息与第一条消息时间戳的差值;next index offset是链表索引,表示链表前一个节点的偏移量。

添加索引时,计算key的哈希值,并找到对应的slot,并根据header中的indexCount在LinkedList中在添加一条索引数据。如果原slot中已经存在一条数据,将原slot中的数据写入新增数据的next index offset字段,并更新slot中的数据为新数据的偏移量。

根据key查询时,先计算key的hash值,然后找到slot中存储的偏移量,在根据偏移量依次轮询链表中的消息数据,找到对应的key之后返回数据即可。