Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)

Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)

使用java nio的Selector(多路复用)技术

  1. 每个EndPoint对应一个Acceptor,Acceptor运行在一个Thread中,打开ServerSocketChannel,监听连接事件。

  2. 当有新的连接请求进入时,通过Selector可以获取到对应的SelectionKey_Accept。Acceptor把SelectionKey_Accept转交给Processor处理。

  3. 一个Acceptor对应多个Processor,每次递送SelectionKey_Accept时,循环选择其中一个Proessor。

  4. 各个Processor运行在独立的Thread中,每个Processor拥有一个KafkaSelector,KafkaSelector内含java nio的Selector。当接收到SelectionKey_Accept时:
    4.1 从SelectionKey_Accept中获取SocketChannel,注册到KafkaSelector的nio selector,生成用于读写的SelectionKey_RW
    4.2 KafkaSelector调用ChannelBuilder生成对应此SelectionKey_RW的KafkaChannel。每个KafkaChannel都含有一个TransportLayer,EndPoint的protocol决定了TransportLayer是否加密(SSL)。TransportLayer利用SelectionKey_RW进行网络数据的读写。

  5. Processor会不停地调用KafkaSelector,进而调用nio selector,检查是否有新的Readable SelectionKey产生。如果有,则通过TransportLayer组装出NetworkReceive,再把NetworkReceive parse成对应的Request,然后放入RequestChannel中。

  6. KafkaRequestHandlerPool拥有多个KafkaRequestHandler。每个KafkaRequestHandler运行在一个独立的Thread,会不停从RequestChannel中获取Request,然后调用KafkaApi进行处理。KafkaApi会根据Request的类型进行不同的处理,处理完之后生成Response,如果Response需要返回给客户端,则会serialize成Send,通过TransportLayer进行传输。

可以看出:
1. Acceptor的作用是为了把连接请求分流到不同的Processor
2. Processor为每个请求建立独立的传输通道,通过nio selector统一监听所有通道有木有数据到来
3. 如果通道产生了数据则进行读取,数据读取完整后就进行解析,再把Request放入RequestChannel等待处理。数据本身(NetworkReceive的payload)包含了元数据,描述了自身属于哪种Request。
4. KafkaRequestHandlerPool是个线程池,负责Request的处理,与网络通讯分离开。处理完的结果再次放回到RequestChannel,Processor会从RequestChannel里面提取结果最后决定是否要传输出去。
5. Acceptor和Processor分离了网络连接请求和连接建立后的读写请求,而RequestChannel则解耦了网络通讯和业务逻辑处理。


Acceptor的工作流程
Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)


Processor的工作流程
Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)


KafkaSelector poll方法的工作流程
Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)

KafkaSelector pollSelectionKeys的工作流程
Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)


NetworkReceive的读取
1. NetworkReceive包含2个ByteBuffer,一个是用来读取requestedBufferSize,另外一个是用来读取真正的数据,读取数据的ByteBuffer的size就由requestedBufferSize来指定。
2. 在引入MemoryPool来控制内存的分配和回收后(默认使用的MemoryPool可以认为不做任何限制),如果读取到的requestedBufferSize > MemoryPool当前可用的内存,则会mute掉当前的KafkaChannel,等到后面MemoryPool有足够内存时再unmute。
3. 只有2个ByteBuffer都读取完整后,NetworkReceive才会被认为已经读取成功。