kafka原理详解

1、什么是kafka?

是一个分布式发布-订阅消息系统和一个强大的队列,适合离线和在线消息消费,扩展性特别好。Kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。

Kafka为什么比其他的MQ都快,采用的是机制是顺序写入磁盘和Memory Mapped Files(内存映射文件)。

顺序写入:每个partition都是一个文件,kafka会把收到的message插入到文件末尾,每个consumer会对每个topic都有一个offset用来表示读取到了第几条数据。kafka会把所有的数据都保留下来,但是数据落到磁盘后,会随着数据增加,而选择要不要删除,kafka目前提供两种机制来删除,一种是基于时间的,数据默认保留7天,一种是基于partition文件大小的。

Kafka不是实时的写入硬盘,充分利用操作系统的分页存储来提高I/O操作。64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。

2、了解主要术语

kafka原理详解

producer

发布Message时候,必须把指定的消息发布到哪个topic上。

通过 default.replication.factor 对replica的数目进行配置,默认值为1,表示不对topic进行备份。如果配置为2,表示除了leader节点,对于topic里的每一个partition,都会有一个额外的备份。

通过request.required.acks来设置,选择是否等待消息commit(是否等待所有的”in sync replicas“都成功复制了数据)。acks in {0,1,-1},性能依次递减,健壮性依次增加,0:生产着不等待broker同步完成的消息,继续发送下一条;1:等待leader同步完成的消息并确认,才继续发送下一条;-1:等待follower的副本同步完成的消息并确认,才继续发送一条。

Producer可以通过acks参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。

推荐的做法是,将acks设置为all或者-1,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

负载均衡

producer发送到哪个partition的路由规则是自己定义的。默认路由规则:hash(key)%numPartitions,如果key为null则随机选择一个partition。

自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。

异步批量发送

批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。

Message(消息)

Kafka 中的一条记录或数据单位。

主要由4部分组成:offset、timestamp、key、value;其中前两个是在kafka集群中生成,后两个是producer发送数据的时候产生。

一个Message由固定长度的header和一个变长的消息体body组成。

8 byte offset在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message。

4 byte message sizemessage大小。

4 byte CRC32用crc32校验message。

1 byte “magic"表示本次发布Kafka服务程序协议版本号。

1 byte “attributes"表示为独立版本、或标识压缩类型、或编码类型。

4 byte key length表示key的长度,当key为-1时,K byte key字段不填。

K byte key可选。

value bytes payload表示实际消息数据。

Broker(代理者)

一台kafka服务器称为broker,是一个物理概念。

Topic(主题)

每条消息属于一个topic,一个topic包含一个或多个Partition,建topic的时候,可以手动指定partition个数,个数与服务器个数相当,是一个逻辑概念。

Partition(分区)

每partition是一个有序的队列,每个partition只会在一个Broker上,且每个分区在物理上都对应着一个文件夹,该文件夹下存储着所有消息和索引文件。

kafka默认使用时hash进行分区,所以会出现不同的分区数据不一样的情况,但是partitioner是可以override的。

Partition包含多个segment,每个segment对应一个文件,segment可以手动指定大小,当segment达到阈值时,将不再写数据,每个segment都是大小相同的。

segment由多个不可变的记录组成。记录只会被append到segment中,不会被单独删除或修改,每个segment中的message数量不一定相等。当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

清除过期日志时,支持删除一个或多个segment,默认保留7天数据。

Producer生产的message按照一定的分组策略被发送到Broker中的partition中,如果出现message在内存中放不下的情况,就会放在partition目录下的文件中,partition目录名是topic的名称加上一个序号。在这个目录下有两个文件,一类是以log为后缀的文件,一类是以index为后缀的文件,其中log文件和index文件是一一对应关系,这对文件就是segment file,其中log中存放的是数据文件,也就是message,index存在的是元数据信息,记录的是Message的物理偏移量。

LogSegment文件命名的规则是,partition全局的第一个segment从0(20个0)开始,后续的每个文件(index,log)的文件名是上一个文件的最后一条消息offset值,为啥这样命名呢?这样的话,通过二分查找能更快的检索出下次消费消息index的位置,Consumer消费的消息是index的offset的值。

上图是index和log的关系图,左图里面存放的k-v关系,k是message在log中的编号,如1,3,6,8表示第1条,第3条,第6条,第8条,可index并没有为每条message都建立索引,而是采用稀疏矩阵的方式,每隔一定字节的数据建立一条索引,可减少索引文件大小,从而将索引文件放到内存中,但是就没办法对没有建立的索引的message进行一次性定位,这就需要对某一块的数据进行一次顺序扫描,value值表示该消息的物理偏移量。

kafka中的数据,局部有序,全局无序,消费某个topic中的数据,仅仅可以保证某个partition中的消费的message是有序的,但是多个partition之间是无序的。存在一种场景需求是需要全局有序,只有一个消费的生产者,只存在一个partition的时候可以实现的。但是如果多个生产者,一个partition,连局部有序都不能保证啦。

Consumer(消费者)

负责从对应Topic获取数据的进程。控制消息读取的速度和数量,consumer可以读取old offset的位置的数据。如果broker没有数据,则可能要pull多次忙等待,kafka可以配置consumer long pull一直等到有数据。

At most once:发送一次消息,无论成败,将不会再重新发送,读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息。

At least once:消息至少发送一次,如果发送失败,则一直发,直到消息接受成功,读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次。通常情况下是首选。

Exactly once:消息只发送一次,读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。

Kafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。

在Kafka 0.11.x版本引入了该语义,该博客对该语义提供了详细的解释:https://blog.csdn.net/zhangjun5965/article/details/78218169

Consumer Rebalance

如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。

如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据。

如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。

Consumer group(消费者组)

每个consumer都属于一个特定的consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费,最好Consumer的数量等于partition数量。同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不同consumer group可以同时消费同一条消息。

Replication

一个partition的复制个数(replication factor)包括这个partition的leader本身。

所有对partition的读和写都通过leader。

Followers通过pull获取leader上log(message和offset)。

如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从”in sync replicas“(ISR)列表中删除。

当所有的”in sync replicas“的follower把一个消息写入到自己的log中时,这个消息才被认为是”committed“的。

如果针对某个partition的所有复制节点都挂了,Kafka默认选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。

Leader选举

Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。

在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,为了容忍f个副本的失败,“少数服从多数”的方式和ISR在commit前需要等待的副本的数量是一样的,但是ISR需要的总的副本的个数几乎是“少数服从多数”的方式的一半。

管理offset

0.8.2 Kafka引入native offset storage,将offset管理从ZK移出,并且可以做到水平扩展。实现原理其实也很自然,利用了Kafka自己的compacted topic,以consumer group,topic与Partition的组合作为key。所以offset的提交直接写到compacted topic中,但是又由于offset很重要还要求不能丢数据,所以消息的acking级别(由request.required.acks控制)设置为-1,producer等到所有ISR收到消息后才会得到ack(数据安全性最好,但是速度会有影响)。所以,Kafka又在内存中维护了<consumer group,topic,partition>的三元组来维护最新的offset信息,consumer来取最新offset信息的时候直接内存里拿即可。当然,敏锐性强的朋友一定想到了,kafka允许你快速的checkpoint最新的offset信息到磁盘上。

Distribution

Consumer Offset Tracking

High-level consumer记录每个partition所消费的maximum offset,并定期commit到offset manager(broker)。

Simple consumer需要手动管理offset。现在的Simple consumer Java API只支持commit offset到zookeeper。

Consumers and Consumer Groups

consumer注册到zookeeper

属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费。

当broker或同一个group的其他consumer的状态发生变化的时候,consumer rebalance就会发生。

kafka partition的选主机制

它在所有Broker中选出一个controller,所有partition的leader选举都由controller来决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。

Kafka集群controller的选举如下:

每个broker都会在controller path上注册一个watch

当前的controller失败时,对应的controller path会自动消失,此时该watch被fire,所有活的broker都会去竞选成为新的controller(创建新的controller path),但是只会有一个竞选成功。

竞选成功的成为leader,竞选失败的则重新在新的controller path上注册watch。因为zookeeper的watch是一次性的,被fire一次之后就失效,所以需要重新注册。

kafka partition leader的选举过程如下(由controller执行):

从zookeeper中读取当前分区的所有的ISR集合,调用配置的分区选举算法选择分区leader。

kafka原理详解

一共有3个分区分别是0,1,2, replica 有2个:

partition 0 的leader在broker1, follower在broker2

partition 1 的leader在broker2, follower在broker0

partition 2 的leader在broker0, follower在brokder1

一个broker中不会出现两个一样的Partition,replica会被均匀的分布在各个kafka server(broker)上 。

在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。

任何broker或者consumer的增减都会触发所有的consumer的rebalance。

rebalance 算法

对每个被订阅的topic T做如下操作:

将topic T的所有partition排序,组成一个集合PT(所以,在同一broker的分区会聚集在一起)

将同一group的consumer排序,组成一个集合CG,Ci即为第i个consumer

N=size(PT)/size(CG),向上取整

将分区i*N 到 分区(i+1)*N -1分配给consumer Ci

从分区所有者注册表中删除CI所拥有的当前条目

将新分配的分区添加到分区所有者注册表