Kafka的架构及其原理

Kafka的架构及其原理

概述

  1. Kafka最初由LinkedIn公司开发的分布式消息系统,后来才贡献给了Apache,底层是由Scala编写
  2. 具有高性能、持久化、多副本备份、横向扩展的能力
  3. 基于发布-订阅模式(push-pull),常用作解耦、削峰、异步处理等
  4. 下文,会大概介绍Kafka的基本原理、存储机制、复制原理、同步原理、可靠性和持久性、高性能保证等

kafka体系架构

Kafka的架构及其原理

解释:

  • 包含若干Producer(比如:服务器日志、业务数据、前端产生的page view等等),push模式生产数据
  • 包含若干Broker(kafka支持横向扩展,broker越多的话,吞吐量越大)
  • 若干Consumer(Group),pull模式消费数据
  • one Zookeeper集群(集群管理、选举leader,rebalance等操作)
  • borker相关信息存在Zookeeper中目录:/borker/topics/[topic]/partitions/[partition]/state

Kafka的架构及其原理

  1. 角色认识
序号 角色名称 解释
1 producer 消息生产者,发布消息到 kafka 集群的终端或服务
2 broker kafka 集群中包含的服务器节点,一个kafa节点就是一个broker,一个或者多个broker组成一个kafka集群
3 topic topic是逻辑上的概念,每条发布到 kafka 集群的消息属于的类别,即 kafka 是按照 topic对信息进行归类的
4 partition partition是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition,分区是kafka负载均衡和容错恢复的单位,每个partition内部是有序的
5 consumer 从 kafka 集群中消费消息的终端或服务
6 Consumer group 每个consumer属于一个消费者组,组内竞争、组间共享,即每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
7 replica partition 的副本,保障 partition 的高可用
8 leader replica 中的一个角色, producer 和 consumer 只跟 leader 交互
9 follower replica 中的一个角色,从 leader 中复制数据
10 controller kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover
11 zookeeper kafka 通过 zookeeper 来存储集群的 meta 信息
  1. topic & partition
    Kafka的架构及其原理
  • topic可以认为是一类消息,每个topic被分成多个partition,每个partition在存储层面是append log文件
  • 任何发布到此partition的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型的数字,它唯一标记一条消息
  • 每条消息都被append到partition中,是顺序写磁盘,因此效率非常高(不用磁道寻址,顺序写磁盘会比随机写内存还要快)
  • 消息发送的策略
    :发送消息时候,会根据kafka.producer.Partitioner来决定发送到哪个partition(实际情况:当消息有key时,会按照key来hash进行发送,没有消息key则会round-robin发送)
- 那topic的分区怎么指定呢
1. 配置文件
vim $KAFKA_HOME/config/server.properties
num.partitions=3 

或者(第二种指定会覆盖第一种)
2. bin/kafka-topics.sh --create --zookeeper xx.xx.xx.xx:2181,...,... --replication-factor 3 --partitions 3 --topic HeartbeatMessage

高可靠性

  1. kafka的高可靠性保障来源于其健壮的副本策略(replication),可以在$KAFKA_HOME/config/server.properties中配置default.replication.refactor
  2. 下面从kafka的文件结构、复制原理、同步方式、ISR、HW、leader选举、数据的可靠性和持久性保证多维度来介绍

文件结构

  1. 逻辑上:Topic
  2. 物理上:Partition
  3. 每个partition在物理上由多个segment组成
  • 举例说明:创建主题test_topic,3副本 ,3分区
    具体的数据在磁盘上存放的目录是(server.properties里面配置的):/tmp/kafka-logs

可以看到该目录里已经生成3个目录(命名规则:topic-分区序号):

drwxr-xr-x 2 root root 4096 Jan 25 15:10 test_topic-0
drwxr-xr-x 2 root root 4096 Jan 25 15:10 test_topic-1
drwxr-xr-x 2 root root 4096 Jan 25 15:10 test_topic-2
  1. Segment(.index文件+.log文件)
  • 提到segment,为什么要这样设计呢,想想你的producer会一直往分区里面放入数据,partition文件会越来越大的,那么消息文件的维护以及历史数据的清除也会很不方便,所以才会对partition分段
  • 每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)
  • 这样每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定
  • segment文件由两部分组成,分别为 “.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

  • 那么.index索引文件和.log数据文件的关系是怎么样的呢

      举例说明:segment:00000000000000170410的“.index”文件和“.log”文件的对应关系如下图:
    

Kafka的架构及其原理

解释1:

  • [为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。索引包含两部分,分别是baseOffset,还有position]
    • [ baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如:kafka使用的是varint]
    • [ position:在segment中的绝对位置。查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message ]

举例:假设读取offset=170416的消息,首先查找segment文件

  • 首先通过二分法确定
    数据一定在00000000000000170410.log中
  • 00000000000000170410.log中第一条数据的offset为170411
  • 所以00000000000000170410.index索引文件中,我们要找的数据的baseOffset为6
  • 去index文件中,是稀疏矩阵哦,很容易确定是在4-8之间嘛,之后,从4到8遍历找数据就行

问题:什么时候知道消息读完了呢

消息的数据结构

消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止

Kafka的架构及其原理

复制原理和同步方式

  1. Kafka中topic的每个partition有一个预写式的日志文件,虽然partition可以继续细分为若干个segment文件,但是对于上层应用来说可以将partition看成最小的存储单元(一个有多个segment文件拼接的“巨型”文件),每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中

Kafka的架构及其原理

  1. 上图有两个新名词:HW和LEO。这里先介绍下LEO,LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置,这个涉及到多副本的概念,这里先提及一下,下面再详表。

  2. 言归正传,为了提高消息的可靠性,Kafka每个topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。

  3. 如下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3:

Kafka的架构及其原理

  1. Kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表(isr)中选举一个副本为leader,或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Syn Replicas缩写,表示副本同步队列,具体可参考下面的)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower“落后”太多或者失效,leader将会把它从ISR中删除(有个参数可以控制这个的)

ISR

  1. 上面我们涉及到ISR (In-Sync Replicas),这个是指副本同步队列。副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性
  2. 默认情况下Kafka的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3
  3. 所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(延迟时间 replica.lag.time.max.ms, ),超过阈值会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR
  4. 注意:ISR中包括:leader和follower
  5. 上面还涉及到一个概念,即HW。HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。
  6. 下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:
    Kafka的架构及其原理
  7. 由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
  8. Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护:
  • Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas
  • leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中
  • state数据结构
state数据结构:
{"controller_epoch":5,           ##表示kafka集群中的*控制器选举次数
"leader":1,                      ##当前partition的leader所在的borker id
"version":1,                     ##版本编号默认为1
"leader_epoch":6,                ##leader选举次数
"isr":[2,1,3]                    ##当前partition的In-sync replica,副本组的borker id列表
}

数据可靠性和持久性保证

  1. 生产端的可靠性
  • 当producer向leader发送数据时,可以通过request.required.acks=all

  • 参数来设置数据可靠性的级别:

    • 1(默认):这意味着producer在ISR中的leader已成功收到数据并得到确认。如果leader宕机了,则会丢失数据。
    • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
    • -1或者all:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况(所以一般会搭配min.insync.replicas这个控制isr中副本数量的参数一起使用才能有效保障生产端不丢数据)
  • 详细说明:这边对acks=1和-1的两种情况进行详细分析:

    • request.required.acks=1

    • 问题分析:producer发送数据到leader,leader写本地日志成功,返回客户端成功;此时ISR中的副本还没有来得及拉取该消息,leader就宕机了,那么此次发送的消息就会丢失
      Kafka的架构及其原理

    • request.required.acks=-1

    • 数据发送到leader, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失
      Kafka的架构及其原理

    • 考虑问题:数据发送到leader后 ,部分ISR的副本同步,部分没有,leader此时挂掉。比如follower1和follower2都有可能变成新的leader,leader挂了, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复,如下图:
      Kafka的架构及其原理

建议配置:
request.required.acks=all或者-1
min.insync.replicas=2或者更多
这样假设isr中副本数不够2时,会报错
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required
生产端会接受到报错信息,重新写,从而避免了生产端丢数据

  1. HW的存在
  • 考虑上面提到的问题(即acks=-1,部分ISR副本同步)中的另一种情况,如果在Leader挂掉的时候,follower1同步了消息4,5,follower2同步了消息4,与此同时follower2被选举为leader,那么此时follower1中的多出的消息5该做如何处理呢?
  • 这里就需要HW的协同配合了。如前所述,一个partition中的ISR列表中,leader的HW是所有ISR列表里副本中最小的那个的LEO。类似于木桶原理,水位取决于最低那块短板

Kafka的架构及其原理

  • 如上图,某个topic的某partition有三个副本,分别为A、B、C。A作为leader肯定是LEO最高,B紧随其后,C机器由于配置比较低,网络比较差,故而同步最慢。这个时候A机器宕机,这时候如果B成为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。
  • A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。
  • 如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息。leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息
  1. leader的选举
  • Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)
  • Kafka所使用的leader选举算法更像是微软的PacificA算法

Kafka的架构及其原理

  • 上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

    • 等待ISR中任意一个replica“活”过来,并且选它作为leader
    • 选择第一个“活”过来的replica(并不一定是在ISR中)作为leader
  • 这就需要在可用性和一致性当中作出一个简单的抉择。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中所有的replica都无法“活”过来了,或者数据丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保障已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源。默认情况下,Kafka采用第二种策略,即unclean.leader.election.enable=true,也可以将此参数设置为false来启用第一种策略(需要权衡)

  • unclean.leader.election.enable这个参数对于leader的选举、系统的可用性以及数据的可靠性都有至关重要的影响。根据实际情况使用

  1. broker端的可靠性
  • 配置多分区、多副本就行
  1. consumer端的可靠性
  • 比如说:可以将业务处理+提交offset绑定成一个事务,要么全成功,要么全失败

消息传输过程的可靠性保障

  1. 三层语义
  • At most once: 消息可能会丢,但绝不会重复传输
  • At least once:消息绝不会丢,但可能会重复传输
  • Exactly once:每条消息肯定会被传输一次且仅传输一次
  1. Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前Kafka实现的是at least once

  2. consumer读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于at least once

  3. 消息去重

  • 建议业务方根据自身的业务特点进行去重,比如业务消息本身具备幂等性,或者借助Redis等其他产品进行去重处理
  1. 高可靠性配置
topic的配置:replication.factor>=3,即副本数至少是3个;2<=min.insync.replicas<=replication.factor
broker的配置:leader的选举条件unclean.leader.election.enable=false
producer的配置:request.required.acks=-1(all),producer.type=sync

高性能

  1. 生产端(两个技术进行写入的优化)
  • 顺序IO

  • MMFile

  • 顺序写入
    因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O

Kafka的架构及其原理

上图就展示了Kafka是如何写入数据的,每一个Partition其实都是一个文件,收到消息后Kafka会把数据插入到文件末尾(虚框部分)。
这种方法有一个缺陷——没有办法删除数据,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,可以设置历史数据保存日期的

  • MMF(Memory Mapped Files)
    • 即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。
    • Memory Mapped Files(后面简称mmap)也被翻译成内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)
      Kafka的架构及其原理
    • 通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。
    • 使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)
    • 问题是也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘
    • 解决:Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);写入mmap之后立即返回Producer不调用flush叫异步(async)
    • mmap其实是Linux中的一个函数就是用来实现内存映射的,其实利用的是Java NIO,提供了一个mappedbytebuffer类可以用来实现内存映射
  1. 消费端(读取数据)
  • 顺序读取

    • 每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据
    • 读取的时候也是按照每个partition来进行顺序读取的
      Kafka的架构及其原理
  • 思考:如何提高Web Server静态文件的速度
    仔细想一下,一个Web Server传送一个静态文件,如何优化?答案是zero copy

  • 传统模式下我们从硬盘读取一个文件是这样的

Kafka的架构及其原理

先复制到内核空间(read是系统调用,放到了DMA,所以用内核空间)->然后复制到用户空间(1,2)->从用户空间重新复制到内核空间(你用的socket是系统调用,所以它也有自己的内核空间)->最后发送给网卡(3、4)

  • zero copy

Kafka的架构及其原理

- Zero Copy中直接从内核空间(DMA的)发送网卡

- Java的NIO提供了FileChannle,它的transferTo、transferFrom方法就是Zero Copy,Netty、Ngnix也是使用的zero copy

Zero Copy对应的是sendfile这个函数(以Linux为例),这个函数接受

out_fd作为输出(一般及时socket的句柄)
in_fd作为输入文件句柄(用mmap作为文件读写方)
off_t表示in_fd的偏移(offset从哪里开始读取)
size_t表示读取多少个(max.poll.size)