Kafka SocketServer Reactor with MultiThread (Kafka高并发原理)
使用java nio的Selector(多路复用)技术
每个EndPoint对应一个Acceptor,Acceptor运行在一个Thread中,打开ServerSocketChannel,监听连接事件。
当有新的连接请求进入时,通过Selector可以获取到对应的SelectionKey_Accept。Acceptor把SelectionKey_Accept转交给Processor处理。
一个Acceptor对应多个Processor,每次递送SelectionKey_Accept时,循环选择其中一个Proessor。
各个Processor运行在独立的Thread中,每个Processor拥有一个KafkaSelector,KafkaSelector内含java nio的Selector。当接收到SelectionKey_Accept时:
4.1 从SelectionKey_Accept中获取SocketChannel,注册到KafkaSelector的nio selector,生成用于读写的SelectionKey_RW
4.2 KafkaSelector调用ChannelBuilder生成对应此SelectionKey_RW的KafkaChannel。每个KafkaChannel都含有一个TransportLayer,EndPoint的protocol决定了TransportLayer是否加密(SSL)。TransportLayer利用SelectionKey_RW进行网络数据的读写。Processor会不停地调用KafkaSelector,进而调用nio selector,检查是否有新的Readable SelectionKey产生。如果有,则通过TransportLayer组装出NetworkReceive,再把NetworkReceive parse成对应的Request,然后放入RequestChannel中。
KafkaRequestHandlerPool拥有多个KafkaRequestHandler。每个KafkaRequestHandler运行在一个独立的Thread,会不停从RequestChannel中获取Request,然后调用KafkaApi进行处理。KafkaApi会根据Request的类型进行不同的处理,处理完之后生成Response,如果Response需要返回给客户端,则会serialize成Send,通过TransportLayer进行传输。
可以看出:
1. Acceptor的作用是为了把连接请求分流到不同的Processor
2. Processor为每个请求建立独立的传输通道,通过nio selector统一监听所有通道有木有数据到来
3. 如果通道产生了数据则进行读取,数据读取完整后就进行解析,再把Request放入RequestChannel等待处理。数据本身(NetworkReceive的payload)包含了元数据,描述了自身属于哪种Request。
4. KafkaRequestHandlerPool是个线程池,负责Request的处理,与网络通讯分离开。处理完的结果再次放回到RequestChannel,Processor会从RequestChannel里面提取结果最后决定是否要传输出去。
5. Acceptor和Processor分离了网络连接请求和连接建立后的读写请求,而RequestChannel则解耦了网络通讯和业务逻辑处理。
Acceptor的工作流程
Processor的工作流程
KafkaSelector poll方法的工作流程
KafkaSelector pollSelectionKeys的工作流程
NetworkReceive的读取
1. NetworkReceive包含2个ByteBuffer,一个是用来读取requestedBufferSize,另外一个是用来读取真正的数据,读取数据的ByteBuffer的size就由requestedBufferSize来指定。
2. 在引入MemoryPool来控制内存的分配和回收后(默认使用的MemoryPool可以认为不做任何限制),如果读取到的requestedBufferSize > MemoryPool当前可用的内存,则会mute掉当前的KafkaChannel,等到后面MemoryPool有足够内存时再unmute。
3. 只有2个ByteBuffer都读取完整后,NetworkReceive才会被认为已经读取成功。