kafka producer 发送消息的流程分析

producer 是如何将某个topic的一条record发送到该topic对应的某个分区Partition上面去的?

首先分析一个重要的消息载体  ConcurrentMap<TopicPartition, Deque<RecordBatch>> 的结构,它是一个Map(对HashMap进行了包装,HashMap不是线程安全的,而其包装类CopyOnWriteMap是线程安全的),key为TopicPartition, value 为 ArrayDeque<ProducerBatch>。它是RecordAccumulator 的一个成员变量,RecordAccumulator 会为每一个ProducerBatch分配一个batch.size 大小的Buffer,用于存储Byte字节流。

kafka producer 发送消息的流程分析

partitioner 决定Record将会被送往那个Partition,默认是轮询。

Topic00 代表的意义是这个key对应的Value属于Topic0, Value将会送往Topic0对应的0号Partition。

Topic01 代表的意义是这个key对应的Value属于Topic0, Value将会送往Topic0对应的1号Partition。

Topic02 代表的意义是这个key对应的Value属于Topic0, Value将会送往Topic0对应的2号Partition。

根据Record的所属属性Topic和Partition将决定该Record将添加到Map中的相应的队列中去,

Value 为 ArrayDeque<ProducerBatch>,正常情况下往队列中添加一个Batch,该Batch应该被立刻被送往Server,如果peeklast不为null就说明添加的速度大于送往Server端的速度,导致队列积压。 

Hopefully this doesn't happen often...  这种情况不应该经常发生。发生这种情况一般应该是业务高峰期或者网络出现问题或者是参数设置不当导致network-thread线程阻塞。

具体的添加逻辑如下

kafka producer 发送消息的流程分析

尝试将record的信息添加到dq的last batch里面去

kafka producer 发送消息的流程分析

 

重点问题,该map有什么读写上的特征呢?

也许你会认为在不停的往Map中添加数据,然后被添加的数据又会读出被送往Server端,读写应该差不多,其实不然,该Map的get操作远远大于put操作,写操作发生于ArrayDeque的addLast操作和pollFirst操作,这些是get取到ArrayDeque之后对ArrayDeque进行的操作,并不属于对map的写操作。因此这里的Map是一个get操作远远大于put的Map,针对这种情况,API实现者在这里封装了CopyOnWriteMap,以优化读操作,具体的实现如下。

kafka producer 发送消息的流程分析

将Map变为不可变的可以优化读操作???

该map使用volatile关键词进行修饰,get操作没有上锁,put操作有上锁。。。

当然在此也使用synchronized关键字将线程不安全的HashMap变为线程安全的。。。

因为producer是支持多线程操作的。。。      

再解释一个问题,为什么Map的get方法没有上锁,因此需要对拿到的Value进行加锁处理。。。

其实这里相当于一个行级锁。。。

为啥不是ConcurrentHashMap

 

       kafka producer 发送消息的流程分析

 

弄清楚了具体的数据结构和层级关系及调用关系,再来看消息的流转应该会轻松很多。

经过一序列的处理一条Record终于存储到Map的某个queue的batch的MemoryRecordsBuilder的ByteBufferOutputStream的ByteBuffer中了, 那么存储的信息是如何被读出了并通过网络发送给某个broker的呢? 发送是另外一个线程完成的事情。

最后开一个玩笑,这并不是一条insert语句或者一个post请求就将信息传送到后台了,这两个载体太重太重。

刚想拎包回家了,突然想到压缩是在哪个层进行的!!!,那里设置使用哪种压缩算法的!!!

当然还有个超级难的问题,kafka 的事务transaction是如何实现的 !!!!

其实写出线程安全的类并不容易,不要以为有关键词synchronized就万事大吉,因为你还需要关注死锁问题和效率问题。。。并且你需要进行思考。。。

 

                                                                            By4个月大的Java程序员

 

 

 

 

 

转载于:https://my.oschina.net/qidis/blog/1537979