kafka原理以及源码相关总结
kafka相关总结
Kafka相关
一些有关kakfa的总结,包括生产者和消费者的一些处理流程和原理,还有一些参数的作用。
一.特点
1.应用解耦
多个应用通过消息队列对相同小心进行处理,相互独立,互不影响。
2.异步处理
相比于串行和并行,异步处理可以减少时间。
比如一个请求要落库,要发送发短信和邮件,假设每个操作10ms。
2.1 串行方式大约30ms。
2.2 并行方式大约20ms。
2.3 消息队列并行方式小于20ms
3.数据限流
流程高峰时期,可以进行流量控制,避免流量过大压垮系统。
4.消息通信
可以实现点对点消息队列或聊天室等。
二.结构
0. 元数据(集群节点信息,分区信息,主题和分区关系等)
元数据保存了集群的节点列表、主题和分区的关系、节点和分区列表,分区副本leader等信息。开始的时候需要向服务端请求,或者必要的时候更新,主要分为ProducerMetadata生产者元数据和ConsumerMetadata消费元数据。
1.生产者
1.1相关重要配置
1.1.1 acks确认机制:
值 | 说明 | 可能存在的问题 |
---|---|---|
0 | 只发消息,不管服务器是否落盘就认为成功了。 | 消息还在发,leader挂了,可能会导致消息丢失。 |
1 | leader分区接受消息,写入磁盘就算成功,不管其他follower同步(默认)。 | leader接受消息了,还没同步给follower,leader挂了,可能会导致消息丢失。 |
all | leader接受后,还要求ISR列表里的follower全同步完成才算成功。 | 时间性能问题,必须要有副本,否则挂了就没了。 |
可能要根据具体业务对消息数据的需求来进行调整。
1.1.2 buffer-memory缓冲区大小
生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为32M。
如果太小,可能造成被写满了,还没来得及发送,导致用户写线程被阻塞等待了,太大的话可能造成浪费,具体还是要看压测或者生产环境来进行调整。
1.1.3 batch-size发送批次大小
每当多个记录被发送到同一分区时,生产者尝试将消息一起批量发送, 提升客户端和服务器上的性能,减少发送IO次数,但是可能消息少的时候就会实时性会有影响,默认值为16K。
为了提高效率,消息被分批次写入Kafka 。批次就是一组消息,这些消息属于同一个主题和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,而且还会,把消息分成批次传输可以减少网络开销。
要在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次越小,传输次数越多,IO次数和用户态内核态切换次数也越多,开销越大。批次数据可以被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
如果一个batch没凑满怎么办呢,linger.ms。
1.1.4 linger.ms设置延迟多久要发送,类似于TCP的nagal算法
如果一个批次的数据不够就不会发送,这样可能会带来大的延迟,所以要设定一个延迟时间,到了这个点就要发送,也就是linger.ms,默认是0,直接发送。
1.1.5 retries重试次数和retries.backoff.ms重试间隔
如果客户端发送数据失败将会重试,但是这样可能会对数据的顺序造成影响,比如两个消息发到同一个分区,如果第一个消息失败,第二个消息成功,那重发一个消息,这样第一个消息就在第二个消息后面了。
1.1.6 max.request.size最大发送的请求消息大小
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指
单个请求里所有消息总的大小。,默认值为1MB。
1.1.7 compression.type压缩算法
默认情况下,消息发送时不会被压缩。该参数可以设置为snappy 、gzip 或lz4 ,它指定了
消息被发送给broker之前使用哪一种压缩算也进行压缩。snappy 压缩算怯由Google巳发明,
它占用较少的CPU ,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网
络带宽,可以使用这种算告。gzip压缩算蓓一般会占用较多的CPU ,但会提供更高的压缩
比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和
存储开销,而这往往是向Kafka发送消息的瓶颈所在。
1.1.8 max.block.ms阻塞超时
该参数指定了在调用send方法时生产者的阻塞时间。当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方屈就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
1.1.9 max.in.flight.requests.per.connection了生产者在收到服务器晌应之前可以发送多少个消息
它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
如果把retries设为非零整数,同时把max.in.flight.requests.per.connection设为比1大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是
很关键的,所以不建议把retries设为0 。可以把max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批悄息时,就不会有其他的消息发送出去。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
1.2 主要功能组件
1.2.1 RecordAccumulator消息累加器
对要发送的消息进行分批收集,根据节点和分区封装成不同的客户端请求,放入通道缓存中,设置写事件,等待Sender线程任务发送。
1.2.1 Sender线程发送任务
进行真正的消息发送,通过轮询,处理写事件发送封装好的客户端请求,将批量消息发送到对应节点中。
1.2.1 Partitioner分区选择器
对消息进行分区,默认是轮询分区,如果设置了key的话,根据key进行hash对分区数取模来选择分区。
1.3 消息发送结构框图
- 对消息拦截处理。
- 对key和value进行序列化。
- 选择分区。
- 将消息放入消息累加器的批次中。
- sender线程从累加器中获取消息。
- 封装客户端请求ClientRequest。
- 将请求交给NetworkClient,准备发送。
- 将请求放入kafkaChannel的缓存里。
- 选择器轮询,处理写事件,发送请求。
- 收到节点响应进行回调。
1.3.1 发送批量消息细节和消息结构
消息被层层封装到某个批次里,批次又被封装到某个分区对应的双向队列里,全都存在累加器的映射里,发送的时候会按照分区主副本进行整合,封装成客户端请求,发送给对应的主副本节点。
1.3.2 发送流程细节
1.2.1 轮询
处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
1.2.2 RecordAccumulator消息累加器添加新消息过程
记录收集器的作用是缓存客户端的消息,还需要通过消息发送线程Sender才能将消息发送到服务端。
1.2.3 批量整合发送优化
Sender线程去迭代batches的时候,将同一个节点的所有分区放一起,在一个请求中发送,这样比所有分区都分开发送效率高,减少了IO开销。
1.2.4 Sender线程处理消息累加器
1.2.5 NetworkClient的poll真正的发送
1.2.6 发送重要组件
2. 消费者
2.1 相关重要配置
2.1.1 auto.offset.reset偏移量无效的情况处理方式
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长
时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认值是latest , 意
思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之
后生成的记录)。另一个值是earliest ,意思是说,在偏移量无效的情况下,消费者将从
起始位置读取分区的记录。
2.1.2 enable.auto.commit是否自动提交偏移量
该属性指定了消费者是否自动提交偏移量,默认值是true 。为了尽量避免出现重复数据和数据丢失,可以把它设为false ,由自己控制何时提交偏移量。如果把它设为true ,还可以通过配置auto-commit-interval属性来控制提交的频率。
2.1.3 partition.assignment.strategy分区分配策略
Rannge:将某个主题的分区分范围,然后分给消费者。
RoundRobin:轮询分,可能有消费者会多一个。
StickyAssignor:再平衡的时候尽量保持已经分配分区的消费者分区变动比较小。
2.1.4 max-poll-records
消费线程每次从缓存的数据中最多获取多少条数据进行处理,比如缓存里有1000条数据的量,每次获取500条,就需要两次,获取完了之后会继续去节点拉取消息。
2.1.5 max-partition-fetch-bytes
某个消费者线程一次从kafka某个节点获取数据的大小,包含很多条具体消息,拉过来后会放入本地缓存,通过 max-poll-records进行响应数量的读取。
2.1.6 fetch.min.bytes
当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
取值范围是:[0, Integer.Max],默认值是1。
默认值设置为1的目的是:使得consumer的请求能够尽快的返回。
2.1.6 fetch.max.wait.ms
Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
2.1.7 heartbeat.interval.ms
影响再平衡,每个消费者都会根据 heartbeat.interval.ms 参数指定的时间周期性地向组协调者发送心跳,确保自己存活,组协调者会进行响应,如果发生了再平衡的话,响应里就会出现REBALANCE_IN_PROGRESS,告诉消费者发生了再平衡,如果在session.timeout.ms时间内,没有发送过心跳包,协调器就认为该消费者挂了,会启动再平衡。
2.1.8 session.timeout.ms
影响协调器判断消费者是否挂了,在这个时间段内,没有心跳就认为是挂了,就会踢出消费组,触发再平衡。
2.1.9 max.poll.interval.ms
poll的间隔时间,也可以理解为服务端和客户端约定的超时时间,也就是业务处理时间不能超过这个,超过这个的话,没有进行poll操作的话,会导致提交偏移量失败,就会被认为消费者出问题了,就会进行再平衡。
2.2 消费者主要组件
2.2.1 SubscriptionState订阅状态
记录了拉取状态和消费状态,分区消息
2.2.2 Fetcher消息拉取器
拉取消息。更新分区状态的拉取偏移量(消费者从分区哪里拉取),创建拉取请求,设置到选择器中,设置监听写事件,然后网络客户端进行轮询,处理写事件,将拉取请求发送到节点。
2.2.3 PartitionAssignor分区分配器
用来决定消费者的分区如何分配,具体是协调器会让一个主消费者去执行分区分配算法,一般来时候第一个发送加入组请求的消费者就是主消费者。
2.2.4 ConsumerCoordinator消费者协调器
与服务端的协调者通信。更新分区的提交偏移量,维持消费者心跳。
订阅模式中动态协调分配分区,只有加入消费者组才可以被分配分区,更新分区状态的提交偏移量(消费者消费的进度在哪里)。如果没有获取到提交偏移量,就会根据auto.offset.reset的设置,来获取最早earliest还是最近 latest 的位置作为偏移量。
消费者向协调者申请加入消费组,服务端存在管理消费组的协调者,协调者将消费者加入消费组,协调者为所有消费者分配分区,消费者从协调者获得分配给它的分区,消费者拉取分区的消息。
消费者获取消息的准备工作有: 连接协调者,向协调者发送请求加入消费组,从协调者获得分配
的分区。
分区分配需要所有等待所有消费者都加入消费组后才开始,然后异步响应
2.2.5 ConsumerNetworkClient网路通信客户端
将某个节点的拉取请求ClientRequest放入UnsentRequests中,内部维护某个节点对应的请求队列。
2.2.6 心跳任务
保持和协调器的通信,否则可能会认为不活跃了,就会进行消费者的再平衡,进行重新分区分配。
为了检查消费者是否活着,否则就可能会进行分区的在平衡分配了。
2.2.7 自动提交任务
如果开启了自动提交的话会进行提交消费进度,这样后面新的消费者进来才知道分区的消费情况,不至于重新消费了。
2.3 消费者特点
2.3.1 生产者消费者网络结构
2.3.2 消费者消息处理语义
至多一次:消息最多被处理一次,但是可能会丢失,比如获取消息就提交,然后再处理,处理的时候可能出异常,就丢失了。(目前的做法,可能还有消息丢,但是不会重复消费)
至少一次:消息可能被重复处理,先处理后提交,在提交的时候出异常,然后有会获取同样的消息处理,就重复了。(可能会重复处理)
正好一次:需要消息消费和偏移提交是原子操作,可能需要用到外部存储。(引入外部存储系统可能会有其他新的问题)
3. 协调者
3.1 消费者拉取消息前必须满足条件
连接协调者,已经分配到分区。
3.2 协调者作用
保存提交偏移量,维持消费者心跳,如有消费者离开,需要重新对消费组的消费者分配分区。
消费者加入后,执行分区分配算法,进行分区分配。
处理消费者的加入消费组请求,同步消费组请求。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵,部分图片来字kafka参考书上,侵删。