kafka 笔记二 kafka生产者

Kafka数据生产流程解析
kafka 笔记二 kafka生产者1. Producer创建时,会创建一个Sender线程并设置为守护线程。
2.
生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
3.
批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
4.
批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。
5.
落盘到broker成功,返回生产元数据给生产者。
6.
元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。
 
原理剖析
kafka 笔记二 kafka生产者KafkaProducer有两个基本线程:
主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
     1.消息收集器
RecoderAccumulator每个分区都维护了一个Deque<ProducerBatch> 类型的双端队列。
     2.ProducerBatch
可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;
     3.由于生产者客户端使用
java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小(batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
     4.每次追加一条
ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

Sender
线程:
     1.该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch>的形式, Node 表示集群的broker节点。
     2.进一步将
<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以 向服务端发送数据。
     3.在发送之前,
Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。