kafka的消息存储

消息的保存路径:

默认情况下是保存在 /temp/kafka-log中

存储方式

使用日志文件+索引的方式

消息写入

使用了顺序写入和零拷贝来提升写入性能,consumer和producer都是使用的二进制数据,避免了格式的转化

日志的分片

当日志文件过大的时候,会严重影响性能,于是当日志文件达到一定的大小的时候会对日志文件进行分片,这个大小默认是1GB,可以通过
long.segment.bytes这个配置来调整,日志的片段叫做segment
segment由两大部分组成:索引(一种是offset索引,一种是时间索引)和数据,他们成对出现后缀分别是index和log

segment命名规则:patition全局的第一个sequment从0开始,后续的每个segment文件名为上一个segment的最后一条消息的offset+1.数值最大为64位long大小,没有的位数补零

如何查找消息?

kafka的消息存储
如图所示,左边一列是offset,右边一列是position(物理偏移量4053~4559的position都是一样的),比如说查找offset为4055的消息。那么会去找消息对应的segment,然后通过二分法去找index文件对应的offset区间,会找到4053,然后根据4053找到对应的position80899,然后去日志文件里面找到80899,依次向下查找,找到offset为4055的消息

long文件内容分析

offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5371

createTime表示创建时间
keysize和valuesize 分别表示键和值的大小
compresscodec表示压缩类型
payload表示消息的具体内容

日志清除策略

  1. 根据日志保留时间清除,当日志保存时间超过指定时间时清除,默认是7天
  2. 根据存储文件的大小,当文件大小超出阈值的时候清除最早的日志文件

kafka会启动一个后台线程,定期的去检查是否存在可以清除的数据

通过log.retention.bytes和log.retention.hours这两个参数来设置,只要满足其中一个就会执行清楚操作