一 日志存储概述
LogSegment
1. 分区日志文件中包含很多的 LogSegment
2. Kafka 日志追加是顺序写入的
3. LogSegment 可以减小日志文件的大小
4. 进行日志删除的时候和数据查找的时候可以快速定位。
5. ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。
日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。

每个 LogSegment 都有一个基准偏移量,表示当前 LogSegment 中第一条消息的 offset。
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、 00000000000000000000.timestamp、00000000000000000000.log)。
如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是121(偏移量从 0 开始)。
日志与索引文件
配置项默认值说明
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。
时间戳索引文件则根据时间戳查找对应的偏移量。
Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每一个消息在索引文件中都有对应的索引项。
每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
通过修改 log.index.interval.bytes 的值,改变索引项的密度。
切分文件
当满足如下几个条件中的其中之一,就会触发文件的切分:
1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE ?
在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。
索引文件切分过程
索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值
当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。
二 日志存储
1 索引
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
log文件几个关键信息如下:
(1)offset是逐渐增加的整数,每个offset对应一个消息的偏移量。
(2)position:消息批字节数,用于计算物理地址。
(3)CreateTime:时间戳。
(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。
(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4。
(6)crc:对所有字段进行校验后的crc值。
偏移量
1. 位置索引保存在index文件中
2. log日志默认每写入4K(log.index.interval.bytes设定的),会写入一条索引信息到index文件中,因此索引文件是稀疏索引,它不会为每条日志都建立索引信息。
3. log文件中的日志,是顺序写入的,由message+实际offset+position组成
4. 索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第一个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对用户是透明的。
稀疏索引,索引密度不高,但是offset有序,二分查找的时间复杂度为O(lgN),如果从头遍历时间复杂度是O(N)。
示意图如下

通过如下命令解析 .index 文件kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head 注意:offset 与 position 没有直接关系,因为会删除数据和清理日志。
思考:如何查看偏移量为23的消息?
Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。
时间戳
时间戳索引文件,它的作用是可以让用户查询某个时间段内的消息,它一条数据的结构是时间戳(8byte)+相对offset(4byte),如果要使用这个索引文件,首先需要通过时间范围,找到对应的相对offset,然后再去对应的index文件找到position信息,然后才能遍历log文件,它也是需要使用上面说的index文件的。
但是由于producer生产消息可以指定消息的时间戳,这可能将导致消息的时间戳不一定有先后顺序,因此尽量不要生产消息时指定时间戳。
注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的。因为数据的写入是各自追加。
思考:查找时间戳为 1557554753430 开始的消息?
1. 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳largestTimeStamp逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。
(为什么不直接选最近修改时间呢?
因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。)
2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。
3. 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。
2 清理
Kafka 提供两种日志清理策略:
日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。
Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值: delete ,还可以选择 compact 。
主题级别的配置项是 cleanup.policy 。
日志删除
日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天, log.retention.ms 优先级最高。
Kafka 依据日志分段中最大的时间戳进行定位。
首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。(为什么不直接选最近修改时间呢? 与之前理由一致)
删除过程
1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
2. 这些日志分段所有文件添加 上 .delete 后缀。
3. 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置
如果活跃的日志分段中也存在需要删除的数据时?
Kafka 会先切分出一个新的日志分段作为活跃日志分段,该日志分段不删除,删除原来的日志分段。
先腾出地方,再删除。
日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.segment.bytes 进行设定。
删除过程
1. 计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)。
2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
3. 执行删除。
根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

删除过程
1. 从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为21,小于logStartOffset,将日志分段1加入到删除队列中
2. 日志分段 2 的下一个日志分段的起始偏移量为35,小于 logStartOffset,将 日志分段 2 加入到删除队列中
3. 日志分段 3 的下一个日志分段的起始偏移量为57,小于logStartOffset,将日志分段3加入删除集合中
4. 日志分段4的下一个日志分段的起始偏移量为71,大于logStartOffset,则不进行删除。
日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的保留。
对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除。
日志压缩方式的实现细节
主题的 cleanup.policy 需要设置为compact。
Kafka的后台线程会定时将Topic遍历两次:
1. 记录每个key的hash值最后一次出现的偏移量
2. 第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日志。
日志压缩允许删除,除最后一个key之外,删除先前出现的所有该key对应的记录。在一段时间后从日志中清理,以释放空间。
注意:日志压缩与key有关,确保每个消息的key不为null。
压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:

日志压缩可以确保:
1. 任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的max.compaction.lag.ms属性来保证从收到消息到消息符合压缩条件之间的最大延时
2.消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已
3.消息的偏移量永远不会改变,它是日志中位置的永久标识符
4.从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的log.cleaner.delete.retention.ms短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时
默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志
清理器,这里为大家总结了以下几点:
1. log.cleanup.policy 设置为 compact ,Broker的配置,影响集群中所有的Topic。
2. log.cleaner.min.compaction.lag.ms ,用于防止对更新超过最小消息进行压缩,如果没有设置,除最后一个Segment之外,所有Segment都有资格进行压缩
log.cleaner.max.compaction.lag.ms ,用于防止低生产速率的日志在无限制的时间内不压缩。
Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的。