kafka 概述


kafka 概述

 

*************************************************

基本概念

 

       kafka 概述

kafka架构包含若干zookeeper、broker、producer、consumer,各节点的作用如下:

zookeeper:存储集群的元数据并负责对其进行管理,选举broker控制器等;

broker:存储消息,由broker控制器向zookeeper上传集群元数据,接受客户端的数据发送及获取请求等;

producer:向broker发送消息,定时发送心跳,并从broker获取所需的元数据信息;

consumer:从broker拉取消息消费,定时发送心跳,并从获取所需的元数据信息;

 

          kafka 概述

topic:消息所属的主题,一个topic可包含多个partition;

partition:分区,一个分区可有多个副本,分区中的副本由leader副本、follower副本组成,follower同步leader副本的数据;

副本:实际存储消息的目录,可包含多个日志分段;

日志分段:由多个log、索引文件等组成,实际存储消息的文件

 

副本集合AR:某个分区的所有副本的集合;

同步副本集合ISR:与leader副本处于同步状态的副本集合,包含leader副本;

非同步副本集合OSR:不与leader副本处于同步状态的follower副本集合,follower副本落后leader副本太多时,会从ISR中剔除,转入到OSR集合中,AR=ISR+OSR

 

                           kafka 概述

高水位HW:consumer可以消费HW之前的消息,HW及其之后的消息不能消费,HW的值为处于同步副本集合中最小的LEO;

LEO:broker即将写入的下一条消息的偏移量

 

************************************************

producer 消息发送

 

                   kafka 概述

消息发送流程

主线程发送的消息经拦截器过滤(拦截器可不配置)、序列化器将key value序列化后、然后通过分区器将消息发送到消息累加器的对应分区;

消息累加器缓存producer发送的消息,producer为消息发送的分区创建队列,队列中的每个节点元素producerBatch可存储一到多个消息(producerRecord),消息累加器的空间大小可通过参数buffer.memory设置,默认为32m,如果producer发送的速度超过sender拉取消息的速度,则造成消息堆积,消息累加器内存空间不足,此时,producer发送消息阻塞,阻赛超过max.block.ms后,则抛出异常;此外,消息累加器还有一个bufferpool,负责对缓存进行管理,实现缓存复用,每个bytebuffer的大小通过参数batch.size设置,默认为16KB;

当producer发送的消息到达消息累加器时,如果队列尾部的producerBatch可以继续写入消息则写入,否则如果消息大小不超过batch.size,则新建bytebuffer,将消息写入bytebuffer,如果超过则创建实际所需的空间大小将消息写入并插入队列尾部

sender线程从消息累加器中拉取消息,将<分区,Deque<PeoducerBatch>>转化为<Node,List<ProducerBatch>>,node表示消息要发送的为broker节点,随后将<Node,List<ProducerBatch>>转化为<Node,Request>进行消息发送;

同时,InFlightRequests会缓存已经发出去但是还没有收到响应的请求,具体保存形式为为Map<Node,Deque<Request>>,每个node对应的未响应请求的长度可以通过max.in.flight.requests.per.connection设置,每个node对应的未响应请求长度默认为5,超过则不能发送请求

 

producer元数据更新:producer向最小负载节点发送请求来更新元数据,最小负载节点即为缓存请求inFlightRequests长度最小对应的broker节点,元数据更新请求同样会加入inFlightRequests中

 

消息发送方式:同步发送、异步发送、发后即忘

同步发送:消息发送后会同步返回消息发送的响应信息,需要阻塞等到一条消息发送完成后才能发送下一条消息

异步发送:消息发送的响应信息会通过异步回调函数反馈

发后即忘:消息发送后不会有任何响应信息,消息容易丢失

同步、异步发送异常可以重试,重试次数可通过参数retries设置,若重试最大次数后,仍然发送失败则报错

 

*******************************************

消息传输保障

 

at most once:最多一次,消息不会重复传输,但是可能会造成消息丢失;

exactly once:只有一次,每条消息只会被传输一次;

at least once:每条消息至少传输一次,消息不会丢失,但会重复

 

producer如果设置了重试发送时遇到网络故障,客户端没有收到响应,produce重新发送消息,此时消息传输属于at least once;

consumer消费时,如果拉取消息后立刻提交位移,则消息传输属于at most once,如果消费成功后提交位移,则消息传输属于at least once

 

*******************************************

幂等:同一请求对一个接口的多次调用与一次调用得到的结果是相同的

 

producer发送消息进行重试的时候,消息重复发送,broker可能会存储重复的消息,kafka从0.11.0.0版本开始引入幂等,实现exactly once消息传输语义

 

实现方式:producer开启参数enable.idempotence,将其设置为true,该参数默认为false

注意:producer开启幂等后,

retries会设置为Integer.MAX_VALUE,如果显示配置,该值需大于1

max.in.flight.requests.per.connection会设置为为5,如果显示配置,该值不能大于5

acks会设置为为-1,如果显示配置,该值要配置为-1

一般来说,开启幂等后,不推荐显示配置这些参数,使用默认的即可

 

实现原理:开启幂等后,每个producer都会被分配一个producer id,producer针对每个发送消息的分区都有一个***,每往分区发送一个消息,***加1,<producer id,partition>==》sequence,

broker会在内存中存储该对应关系,<producer id,partition> ==》sequence,

如果sequence new =sequence old + 1,则接受消息;

如果sequence new < sequence old +1,表示消息重复发送,直接拒绝; 

如果sequence new > sequence old +1,表示中间有消息丢失,报错outOfOldSequenceException,后续的消息发送等操作也会终止

 

*******************************************

事务:kafka从0.11.0.0引入事务特性,可以保证跨分区写入消息的原子性,事务内的写入操作要么一起成功,要么一起失败

 

实现方式:

将producer的transaction.id设置为非空,不同的producer设置的transaction id不能相同,否则之前的producer会报错ProducerFencedException,不能正常使用,最新的producer可以正常使用;

broker会根据transaction id为producer分配一个producer id,同时为其分配一个单调递增的producer epoch;

当producer故障后,可以设置相同transaction id的producer,该producer可将之前故障的producer未完成的事务提交或者回滚;

同时enable.idempotence设置为true,开启幂等功能,如果没有显示设置为true,开启事务功能后,幂等功能也会开启,如果开启事务功能后,如果显示地将enable.idempotence设置为false,会报错ConfigException

 

消费端的不能保证所有提交的事务被消费,原因如下:

消息没有被消费就因过期被删除;

broker采用日志压缩,具有相同key的旧的事务消息被删除

事务消息跨多个分区,consumer没有分配到所有的分区,只分配到部分分区;

consumer可以指定消费的偏移量,指定消费的位置有可能跳过事务消息;

 

事务相关方法

 

 

 

**************************************************

consumer 消息消费

 

                 kafka 概述

消费者消费流程:consumer group订阅topic,topic的分区按某种分配策略分配给consumer group中的每个consumer,consumer从分配到的分区中拉取消息,并提交消费位移(位移提交默认为自动提交,可通过参数enable.auto.commit设置,默认为true,自动提交位移的周期为5s,可通过auto.commit.interval.ms设置,默认为5s),

 

*******************************************

分区分配策略:RangeAssignor、RoundRobinAssignor、StickyAssignor

RangeAssignor:分区总数partitionNum除消费者consumerNum得一个跨度,然后分配给每个consumer,如果不能整除,排在前面的consumer分得较多的分区,该策略是默认的分区分配策略

示例:

topic: p1、p2、p3、p4      consumerGroup:consumer1、consumer2

分区分配结果  consumer1:p1、p2        consumer2:p3、p4

 

topic:p1、p2、p3       consumerGroup:consumer1、consumer2

分区分配结果:consumer1:p1、p2    consumer2:p3

 

RoundRobinAssignor:将分区排序,然后轮询分发给consumer

示例:

topic:p1、p2、p3、p4         consumerGroup:consumer1、consumer2

分区分配结果:consumer1:p1、p3      consumer2:p2、p4

 

stickyAssignor:最终分配结果尽可能均匀、分区重分配时尽可能与上次分配相同

 

 

 

*****************************************************

broker 日志存储

 

      kafka 概述

producer将消息发送到某个topic,经分区器分区后分到topic下的某个partition,最终写请求发送到partition的leader副本中follower副本同步leader副本中的日志数据;消费者分配到某个分区时,也是从leader副本中拉取消息消费

一个分区对应一个log目录,为防止单个文件过大,broker使用了日志分段目录logsegment,log目录可包含多个日志分段目录,logsegment包含日志数据文件、偏移量索引文件、时间戳索引文件、提交点文件等,日志数据文件每写入一定的数据量就会在索引文件中添加一次索引记录,写入数据量的大小可通过参数log.index.interval.bytes设置,默认为4KB

 

logsegment:目录大小默认为1G,可通过参数log.segment.bytes设置

logsegment的切割条件:

日志分段文件的大小超过log.segment.bytes设定的大小,默认为1G;

偏移量或者时间戳索引文件的大小超过log.index.size.max.bytes,默认为10m;

日志数据追加消息的时间戳与起始日志时间戳的差值超过log.roll.ms或者log.roll.hours设定的值,如果同时设置log.roll.ms的优先级较高,默认只设置了log.roll.hours,默认为7天

追加消息的偏移量与日志数据的起始偏移量超过Integer.MAX_VALUE

索引文件会预分配log.index.size.max.bytes参数指定的大小,默认为10M,当创建新的logsegment时,会将索引文件切割为实际的使用的数据空间大小

 

日志文件:存储producer发送的消息数据,producerBatch在日志文件中以RecordBatch的形式存储,每个RecordBatch都有一个偏移量

kafka写入日志数据采用顺序写,在日志文件后面追加数据,写入过程使用了页缓存,数据先写入 缓存中,由系统负责刷盘操作,将缓存中的数据写入磁盘,刷盘分为同步刷盘、异步刷盘(异步刷盘的时间间隔可通过参数log.flush.interval.messages、log.flush.interval.ms参数控制),同步刷盘可提高消息的可靠性,但会影响写入性能,消息的可靠性应该通过多副本来保障,一般将刷盘设置为异步刷盘即可

                                                                                          

                            kafka 概述

                   kafka 概述                   

偏移量索引文件:存储相对偏移量和在日志数据中的物理地址

偏移量索引查询流程:

先找出比查询的比查询的偏移量小的最大的baseOffset,计算相对偏移量R1;

再在偏移量索引文件中查找比R1小的最大的相对偏移量R2,找到对应的position;

根据position在日志文件中查找相应的消息

 

                          kafka 概述

                  kafka 概述

时间戳索引文件:存储当前日志的最大时间戳和相对偏移量,相对偏移量与偏移量索引文件中的偏移量可不相同

时间戳索引文件查询流程:

根据时间戳T1查找所有日志分段最大时间戳比T1大的最小的日志分段;

在时间戳索引文件中用二分法查找比T1小的最大的时间戳,得出相对偏移量R1;

在相对偏移量索引文件中查找比R1小的最大的偏移量,得到对应的position;

在日志数据文件中根据position查找消息数据

 

日志清理:日志删除、日志压缩

日志删除:日志时间超出指定时间则删除,可用参数log.retention.ms、log.retention.minutes、log.retention.hours指定,log.retention.ms的优先级最高

日志压缩:对设置了key的消息进行日志清理,如果多个消息设置了相同的key,只保留最新的消息,之前的消息value设为null,随后删除

 

*******************************************

broker控制器:负责管理集群的topic、分区及副本的状态

选举过程:各broker在zookeeper注册临时节点/controller,注册成功的broker选举成为控制器,同时会更新持久节点 /controller_epoch的数据值,该数据表示controller的选举次数,每个和控制器交互的请求中都会携带controller_epoch,如果请求中的epoch小于内存中存储的epoch,则表示请求过期,如果请求的epoch大于内存中的epoch,表示新的conreoller已经选举出来了,controller失效,因此,可通过epoch控制broker控制器的唯一性

主要作用:

监听分区的变化,当ISR集合变更时,通知其他broker更新本地元数据;

监听主题topic变化,创建、修改、删除主题时,通知其他broker更新元数据;

监听broker变化,当增减broker时,在zookeeper相应节点添加监听事件,通知其他broker节点更新元数据;

leader副本故障时,负责leader副本的选举

 

leader副本的选举:当leader副本故障后,选举AR集合中第一个处于ISR集合中的副本

 

*******************************************

消费者协调器与组协调器:负责协调消费组内consumer的分区分配以及分区的再均衡

 

组协调器group coordinator:每个consumerGroup都对应一个groupCoordinator,负责本消费组的分区分配。group coordinator的计算方式为:Utils.abs(groupId.hashcode)% partition_num of _consumer_offsets得到对应的分区数,分区对应的leader副本所在的broker即为consumer group的协调器

消费者协调器consumer coordinator:第一个加入consumerGroup的consumer选举为consumerCoordinator,如果该consumer故障或者退出,后续近似随机选举任意一个consumer为consumer coordinator

触发再均衡操作:

新的消费者加入consumer group;

consumer group中的consumer故障或者主动退出;

consumer group订阅的topic发生变更;

consumer coordinator发生变更

 

新的consumer加入consumer group的工作流程:

                                  kafka 概述

 

                                   kafka 概述

 

                                   kafka 概述

                                    

过最小负载节点发送查找consumer group对应的组协调器的请求findCoordinatorRequest,查找到组协调器后,与组协调器建立连接;

向组协调器发送请求joinGroupRequest,加入组协调器(组协调器会为consumer分配一个consumer id,同时在消费者中选举出consumer coordinator),请求中携带消费者支持的分区分配策略和consumer的订阅信息组;

协调器为consumer汇总分配策略,形成候选集,consumer在候选集中投票给第一个支持的策略,得票最多的策略几位最终的分配策略,如果选出的分配策略某个consumer不支持,则会报错;

group coordinator将分配策略发送给conusmer coordinator;

consumer向group coordinator发送syncGroupRequest,consumer coordinattor的请求中携带分配方案,group coordinator收到分配方案后,转发给其它的consumer;

之后consumer开启定时心跳,向group coordinator报告consumer可以正常工作,心跳间隔可通过参数heartbeat.interval.ms设置,默认为3s;如果在某一时间段内group coordinator没有收到consumer的心跳,则group coordinator认为consumer下线,触发再均衡操作,该时间段通过参数session.timeout.ms设置,session.timeout.ms得知要在group.min.session.timeout.ms(默认为6s)与group.max.session.timeout.ms(默认为5分钟)之间