Kafka工作流程-Kafka 高可靠性存储

 

1.Kafka 高可靠性存储

    Kafka 的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关 参数,可以使得 Kafka 在性能和可靠性之间运转的游刃有余。Kafka 从 0.8.x 版本开始提供 partition 级别的复制,replication 的数量可以在$KAFKA_HOME/config/server.properties 中配 置(default.replication.refactor)。

    Kafka 文件存储机制

    Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。

    为了便于说明问题,假设这里只有一个 Kafka 集群,且这个集群只有一个 Kafka broker, 即只有一台物理机。在这个 Kafka broker 中配置($KAFKA_HOME/config/ server.properties 中)log.dirs=/tmp/kafka-logs,以此来设置 Kafka 消息文件存储目录,与此同时创建一个 topic: topic_zzh_test,partition 的数量为 4($KAFKA_HOME/bin/kafka-topics.sh –create – zookeeper localhost:2181 –partitions 4 –topic topic_zzh_test –replication-factor 4)。那么我 们此时可以在/tmp/kafka-logs 目录中可以看到生成了 4 个目录:

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0 

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1 

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2 

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

    在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录, partition 的名称规则为:topic 名称+有序序号,第一个序号从 0 开始计,最大的序号为 partition 数量减 1,partition 是实际物理上的概念,而 topic 是逻辑上的概念。

    partition 还可以细分为 segment,这个 segment 又是什么?

    如果就以 partition 为最小存储单位,我们可以想象当 Kafka producer 不断发送消息,必 然会引起 partition 文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理 带来严重的影响,所以这里以 segment 为单位又将 partition 细分。每个 partition(目录)相当于 一个巨型文件被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中 消息数量不一定相等),这种特性也方便 old segment 的删除,即方便已被消费的消息的清 理,提高磁盘的利用率。每个 partition 只需要支持顺序读写就行,segment 的文件生命周期 由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。

    segment 文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为 segment 索引文件和数据文件(引入索引文件的目的就是便于利用二分查找快速定位 message 位置)。 这两个文件的命令规则为:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文 件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长 度,没有数字用 0 填充,如下:

以上面的 segment 文件为例,展示出 segment:00000000000000170410 的“.index”文 件和“.log”文件的对应的关系,如下图:

Kafka工作流程-Kafka 高可靠性存储

    如上图所示,“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的 消息,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。其中以“.index” 索引文件中的元数据[3, 348]为例,在“.log”数据文件表示第 3 个消息,即在全局 partition 中表示 170410+3=170413 个消息,该消息的物理偏移地址为 348。

    那么如何从 partition 中通过 offset 查找 message 呢?

    以上图为例,读取 offset=170418 的消息,首先查找 segment 文件,其中 00000000000000000000.index 为最开始的文件,第二个文件为 00000000000000170410.index (起始偏移为 170410+1=170411),而第三个文件为 00000000000000239430.index(起始偏 移为 239430+1=239431),所以这个 offset=170418 就落到了第二个文件之中。其他后续文 件可以依次类推,以其偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到 具体文件位置。其次根据 00000000000000170410.index 文件中的[8,1325]定位到 00000000000000170410.log 文件中的 1325 的位置进行读取。

    要是读取 offset=170418 的消息,从 00000000000000170410.log 文件中的 1325 的位置进 行读取,那么怎么知道何时读完本条消息,否则就读到下一条消息的内容了?

    这个就需要联系到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、 key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大 小,即读取到哪里截止。

 

2. 复制原理和同步方式

    Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为 若干个 segment 文件,但是对于上层应用来说可以将 partition 看成最小的存储单元(一个由 多个 segment 文件拼接的“巨型”文件),每个 partition 都由一些列有序的、不可变的消息 组成,这些消息被连续的追加到 partition 中。

Kafka工作流程-Kafka 高可靠性存储

    图 2-13 中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念。

    为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas), 其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现 故障自动转移,当 Kafka 集群中一个 broker 失效情况下仍然保证服务可用。在 Kafka 中发生 复制时确保 partition 的日志能有序地写到其他节点上,N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 处理 partition 的所有读写请求,与此同时,follower 会被 动定期地去复制 leader 上的数据。

    如下图所示,Kafka 集群中有 4 个 broker, 某 topic 有 3 个 partition,且复制因子即副本个 数也为 3:

Kafka工作流程-Kafka 高可靠性存储

    Kafka 提供了数据复制算法保证,如果 leader 发生故障或挂掉,一个新 leader 被选举并 被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 leader,或者说 follower 追赶 leader 数据。leader 负责维护和跟踪 ISR(In-Sync Replicas 的缩写,表示副 本同步队列,具体可参考下节)中所有 follower 滞后的状态。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同 步副本。消息复制延迟受最慢的 follower 限制,重要的是快速检测慢副本,如果 follower“落 后”太多或者失效,leader 将会把它从 ISR 中删除。

 

3. ISR

    ISR(In-Sync Replicas),副本同步队列。ISR 中包括 leader 和 follower。副本数对 Kafka 的吞吐率是有一定的影响,但极大的增强了可用性。默认情况下 Kafka 的 replica 数量为 1, 即每个 partition 都有一个唯一的 leader,为了确保消息的可靠性,通常应用中将其值(由 broker 的参数 offsets.topic.replication.factor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas) 统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(包括延迟时replica.lag.time.max.ms 和延迟条数 replica.lag.max.messages两个维度, 当前的0.10.x及以上版本中只支持replica.lag.time.max.ms 这个维度),任意一个超过阈值都会把 follower 剔除出 ISR, 存入OSR(Outof-Sync Replicas) 列表,新加入的 follower 也会先存放在 OSR 中。

    AR=ISR+OSR

    Kafka 0.10.x 版 本 后 移 除 了 replica.lag.max.messages 参 数 , 只 保 留 了replica.lag.time.max.ms 作为 ISR 中副本管理的参数。为什么这样做呢?

    replica.lag.max.messages 表示当前某个副本落后 leaeder 的消息数量超过了这个参数的 值,那么 leader 就会把 follower 从 ISR 中删除。假设设置 replica.lag.max.messages=4,那么 如果 producer 一次传送至 broker 的消息数量都小于 4 条时,因为在 leader 接受到 producer 发送的消息之后而 follower 副本开始拉取这些消息之前,follower 落后 leader 的消息数不会 超过 4 条消息,故此没有 follower 移出 ISR,所以这时候 replica.lag.max.message 的设置似乎 是合理的。但是 producer 发起瞬时高峰流量,producer 一次发送的消息超过 4 条时,也就是 超过 replica.lag.max.messages,此时 follower 都会被认为是与 leader 副本不同步了,从而被踢出了 ISR。但实际上这些 follower 都是存活状态的且没有性能问题,那么在之后追上 leader,并被重新加入了 ISR,于是就会出现它们不断地剔出 ISR 然后重新回归 ISR,这无疑增加了无谓的性能损耗。而且这个参数是 broker 全局的。设置太大了,影响真正“落后”follower 的移除;设置的太小了,导致 follower 的频繁进出。无法给定一个合适的 replica.lag.max.messages 的值,故此,新版本的 Kafka 移除了这个参数。

    HW,HighWatermark 的缩写,俗称高水位,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费,这 样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于 来自内部 broker 的读取请求,没有 HW 的限制。

    (一个 partition 的副本分为 Leader 和 Follower,Leader 和 Follower 都维护了 HW 和 LEO, 而 partition 的读写都在 Leader 完成,Leader 的 HW 是所有 ISR 列表里副本中最小的那个的 LEO,当 Leader 新写入消息后,Leader 的 LEO 更新到加入新消息后的位置,Leader 的 HW 仍在原位置,当所有的 Follower 都同步完成后,HW 更新到最新位置,此时最新写入的消息 才能被 Consumer 消费)

图 2-15 详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过 程:

Kafka工作流程-Kafka 高可靠性存储

    由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上, 同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式极 大的影响了吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据。而 Kafka 的这种使用 ISR 的方式则很好的均衡了确保 数据不丢失以及吞吐率。

    Kafka 的 ISR 的管理最终都会反馈到 Zookeeper 节点上。具体位置为: /brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个 Zookeeper 的节点 进行维护:

    Controller:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条 件下,Controller 下的 LeaderSelector 会选举新的 leader,将 ISR 和新的 leader_epoch 及 controller_epoch 写入 Zookeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。

    Leader:Leader 有单独的线程定期检测 ISR 中 Follower 是否脱离 ISR,如果发现 ISR 变 化,则会将新的 ISR 的信息返回到 Zookeeper 的相关节点中。

 

4. 数据可靠性和持久性保证

    当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

    1(默认):producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;

    0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

    -1:producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才返回 ack, 数据一般不会丢失,延迟时间长但是可靠性最高。但是这样也不能保证数据不丢失,比如当 ISR 中只有 leader 时(前面 ISR 那一节讲到,ISR 中的成员由于某些情况会增加也会减少, 最少就只剩一个 leader),这样就变成了 acks=1 的情况;

    如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,也要 min.insync.replicas 这个参数(可以在 broker 或者 topic 层面进行设置)的配合,这样才能发挥 最大的功效。min.insync.replicas 这个参数设定 ISR 中的最小副本数是多少,默认值为 1,当 且仅当 request.required.acks 参数设置为-1 时,此参数才生效。如果 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka. common.errors. NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

    接下来对 ack=1 和-1 的两种情况进行详细分析:

(1) Request.required.acks = 1

    producer 发送数据到 Leader,Leader 写本地日志成功,返回客户端成功;此时 ISR 中的 副本还没有来得及拉取该消息,Leader 就宕机了,那么此次发送的消息就会丢失 producer 发送数据到 Leader,Leader 写本地日志成功,返回客户端成功;此时 ISR 中的副本还没有 来得及拉取该消息,Leader 就宕机了,那么此次发送的消息就会丢失。

Kafka工作流程-Kafka 高可靠性存储

(2) Request.required.acks = -1

    同步(Kafka 默认为同步,即 producer.type=sync)的发送模式,replication.factor>=2 且 min.insync.replicas>=2 的情况下,不会丢失数据。

    有两种典型情况。acks=-1 的情况下(如无特殊说明,以下 acks 都表示为参数 request.required.acks),数据发送到 leader,ISR 的 Follower 全部完 1 成数据同步后,Leader 此时挂掉,那么会选举出新的 Leader,数据不会丢失。

Kafka工作流程-Kafka 高可靠性存储

    acks=-1 的情况下,数据发送到 leader 后 ,部分 ISR 的副本同步,leader 此时挂掉。比 如 follower1 和 follower2 都有可能变成新的 leader, producer 端会得到返回异常,producer 端 会重新发送数据,数据可能会重复。

Kafka工作流程-Kafka 高可靠性存储

    当然图 2-17 中所示,如果在 leader crash 的时候,follower2 还没有同步到任何数据,而且 follower2 被选举为新的 leader 的话,这样消息就不会重复。

    考虑图 2-18(即 acks=-1,部分 ISR 副本同步)中的另一种情况,如果在 Leader 挂掉的 时候,follower1 同步了消息 4,5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader, 那么此时 follower1 中的多出的消息 5 该做如何处理呢?

    这里就需要 HW 的协同配合了。如前所述,一个 partition 中的 ISR 列表中,leader 的 HW 是所有 ISR 列表里副本中最小的那个的 LEO。类似于木桶原理,水位取决于最低那块 短板。

Kafka工作流程-Kafka 高可靠性存储

    如图 2-19,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定 是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时 候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步 (makeFollower)操作,在宕机时 log 文件之后直接做追加操作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。

A 在做同步操作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。

    如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻 的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送“指 令”让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。

注意:当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时 并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举,新的 leader 会发送“指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。