kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:[email protected],如有任何学术交流,可随时联系。

kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

1 Reactor单线程案例代码热热身

  • 如下是单线程的JAVA NIO编程模型。

  • 首先服务端创建ServerSocketChannel对象,并注册到Select上OP_ACCEPT事件,然后ServerSocketChannel负责监听指定端口上的连接请求。

  • 客户端一旦连接上ServerSocketChannel,就会触发Acceptor来处理OP_ACCEPT事件,并为来自客户端的连接创建Socket Channel,并设置为非阻塞模式,并在其Selector上注册OP_READ或者OP_WRITE,最终实现客户端与服务端的连接建立和数据通道打通。

  • 当客户端向建立的SocketChannel发送请求时,服务端的Selector就会监听到OP_READ事件,并触发相应的处理逻辑。当服务端向客户端写数据时,会触发服务端Selector的OP_WRITE事件,从而执行响应的处理逻辑。

  • 这里有一个明显的问题,就是所有时间的处理逻辑都是在Acceptor单线程完成的,在并发连接数较小,数据量较小的场景下,是没有问题的,但是…

  • Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.

  • 优化点在于:通道连接|读取或写入|业务处理均采用单线程来处理。通过线程池或者MessageQueue共享队列,进一步优化了高并发的处理要求,这样就解决了同一时间出现大量I/O事件时,单独的Select就可能在分发事件时阻塞(或延时),而成为瓶颈的问题。
    kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

      public class NioEchoServer {
      private static final int BUF_SIZE = 256;
      private static final int TIMEOUT = 3000;
    
      public static void main(String args[]) throws Exception {
          // 打开服务端 Socket
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
          // 打开 Selector
          Selector selector = Selector.open();
    
          // 服务端 Socket 监听8080端口, 并配置为非阻塞模式
          serverSocketChannel.socket().bind(new InetSocketAddress(8080));
          serverSocketChannel.configureBlocking(false);
    
          // 将 channel 注册到 selector 中.
          // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
          // 注册到 Selector 中.
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
          while (true) {
              // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
              if (selector.select(TIMEOUT) == 0) {
                  System.out.print(".");
                  continue;
              }
    
              // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
              Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
    
              while (keyIterator.hasNext()) {
    
                  SelectionKey key = keyIterator.next();
    
                  // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
                  keyIterator.remove();
    
                  if (key.isAcceptable()) {
                      // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
                      // 代表客户端的连接
                      // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
                      // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
                      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                      clientChannel.configureBlocking(false);
                      //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
                      // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
                      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                  }
    
                  if (key.isReadable()) {
                      SocketChannel clientChannel = (SocketChannel) key.channel();
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      long bytesRead = clientChannel.read(buf);
                      if (bytesRead == -1) {
                          clientChannel.close();
                      } else if (bytesRead > 0) {
                          key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                          System.out.println("Get data length: " + bytesRead);
                      }
                  }
    
                  if (key.isValid() && key.isWritable()) {
                      ByteBuffer buf = (ByteBuffer) key.attachment();
                      buf.flip();
                      SocketChannel clientChannel = (SocketChannel) key.channel();
    
                      clientChannel.write(buf);
    
                      if (!buf.hasRemaining()) {
                          key.interestOps(OP_READ);
                      }
                      buf.compact();
                  }
              }
          }
      }
    

}

2 Kafka Reactor模式设计思路

  • SelectionKey.OP_READ:Socket 读事件,以从远程发送过来了相应数据

  • SelectionKey.OP_WRITE:Socket写事件,即向远程发送数据

  • SelectionKey.OP_CONNECT:Socket连接事件,用来客户端同远程Server建立连接的时候注册到Selector,当连接建立以后,即对应的SocketChannel已经准备好了,用户可以从对应的key上取出SocketChannel.

  • SelectionKey.OP_ACCEPT:Socket连接接受事件,用来服务器端通过ServerSocketChannel绑定了对某个端口的监听,然后会让其SocketChannel对应的socket注册到服务端的Selector上,并关注该OP_ACCEPT事件。

  • Kafka的网络层入口类是SocketServer。
    我们知道,kafka.Kafka是Kafka Broker的入口类,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的启动入口。我们跟踪代码,即沿着方法调用栈kafka.Kafka.main() -> KafkaServerStartable() -> KafkaServer().startup可以从main()方法入口一直跟踪到SocketServer即网络层对象的创建,这意味着Kafka Server启动的时候会初始化并启动SocketServer。

  • Acceptor的构造方法中,首先通过openServerSocket()打开自己负责的EndPoint的Socket,即打开端口并启动监听。
    然后,Acceptor会负责构造自己管理的一个或者多个Processor对象。其实,每一个Processor都是一个独立线程。

       private[kafka] class Acceptor(val endPoint: EndPoint,
                                        val sendBufferSize: Int,
                                        val recvBufferSize: Int,
                                        brokerId: Int,
                                        processors: Array[Processor],
                                        connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
      
        private val nioSelector = NSelector.open()
        val serverChannel = openServerSocket(endPoint.host, endPoint.port)//创建一个ServerSocketChannel,监听endPoint.host, endPoint.port套接字
      
        //Acceptor被构造的时候就会启动所有的processor线程
        this.synchronized {
          //每个processor创建一个单独线程
          processors.foreach { processor =>
            Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
          }
        }
    
  • Acceptor线程的run()方法,是不断监听对应ServerChannel上的连接请求,如果有新的连接请求,就选择出一个Processor,用来处理这个请求,将这个新连接交付给Processor是在方法Acceptor.accept()

     def accept(key: SelectionKey, processor: Processor) {
         val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//取出channel
         val socketChannel = serverSocketChannel.accept()//创建socketChannel,专门负责与这个客户端的连接
         try {
           //socketChannel参数设置
           processor.accept(socketChannel)//将SocketChannel交给process进行处理
         } catch {
           //异常处理
         }
       }
     
     //Processor.accept():
      /**
        * Queue up a new connection for reading
        */
       def accept(socketChannel: SocketChannel) {
         newConnections.add(socketChannel)
         wakeup()
       }
    
  • 每一个Processor都维护了一个单独的KSelector对象,这个KSelector只负责这个Processor上所有channel的监听。这样最大程度上保证了不同Processor线程之间的完全并行和业务隔离,尽管,在异步IO情况下,一个Selector负责成百上千个socketChannel的状态监控也不会带来效率问题。

       override def run() {
          startupComplete()//表示初始化流程已经结束,通过这个CountDownLatch代表初始化已经结束,这个Processor已经开始正常运行了
          while (isRunning) {
            try {
              // setup any new connections that have been queued up
              configureNewConnections()//为已经接受的请求注册OR_READ事件
              // register any new responses for writing
              processNewResponses()//处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
              poll()  //调用KSelector.poll(),进行真正的数据读写
              processCompletedReceives()//调用mute,停止接受新的请求
              processCompletedSends()
              processDisconnected()
            } catch {
              //异常处理 略
          }
      
          debug("Closing selector - processor " + id)
          swallowError(closeAll())
          shutdownComplete()
       }
    
  • KSelector.register()方法,开始对远程客户端或者其它服务器的读请求(OP_READ)进行绑定和处理。KSelect.register()方法,会将服务端的SocketChannel注册到服务器端的nioSelector,并关注SelectionKey.OP_READ,即,如果发生读请求,可以取出对应的Channel进行处理。这里的Channel也是Kafka经过封装以后的KafkaChannel对象

      public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
              SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
              //如果是SocketServer创建的这个对象并且是纯文本,则channelBuilder是@Code PlainTextChannelBuilder
              KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//构造一个KafkaChannel
              key.attach(channel);//将KafkaChannel对象attach到这个registration,以后可以通过调用SelectionKey.attachment()获得这个对象
              this.channels.put(id, channel);//记录这个Channel
          }
    
  • Processor.processCompletedReceives()通过遍历completedReceives,对于每一个已经完成接收的数据,对数据进行解析和封装,交付给RequestChannel,RequestChannel会交付给具体的业务处理层进行处理。

    * 将completedReceived中的对象进行封装,交付给requestQueue.completRequets
     */
    private def processCompletedReceives() {
      selector.completedReceives.asScala.foreach { receive =>//每一个receive是一个NetworkReceivedui'xiagn
        try {
          //receive.source代表了这个请求的发送者的身份,KSelector保存了channel另一端的身份和对应的SocketChannel之间的对应关系
          val channel = selector.channel(receive.source)
          val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
            channel.socketAddress)
          val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
          requestChannel.sendRequest(req)//将请求通过RequestChannel.requestQueue交付给Handler
          selector.mute(receive.source)//不再接受Read请求,发送响应之前,不可以再接收任何请求
        } catch {
          //异常处理 略
        }
      }
    }
    

kafka集群Broker端基于Reactor模式请求处理流程深入剖析-kafka商业环境实战

  • 详情源码剖析请参考如下博客,讲解非常详细。

      https://blog.csdn.net/zhanyuanlin/article/details/76556578
      https://blog.csdn.net/zhanyuanlin/article/details/76906583
    
  • RequestChannel 负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。每一个SocketServer只有一个RequestChannel对象,在SocketServer中构造。RequestChannel构造方法中初始化了requestQueue,用来存放网络层接收到的请求,这些请求即将交付给业务层进行处理。同时,初始化了responseQueues,为每一个Processor建立了一个response队列,用来存放这个Processor的一个或者多个Response,这些response即将交付给网络层返回给客户端。

      //创建RequestChannel,有totalProcessorThreads个responseQueue队列,
        val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
      class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
        private var responseListeners: List[(Int) => Unit] = Nil
        //request存放了所有Processor接收到的远程请求,负责把requestQueue中的请求交付给具体业务逻辑进行处理
        private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
        //responseQueues存放了所有Processor的带出来的response,即每一个Processor都有一个response queue
        private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
        for(i <- 0 until numProcessors) //初始化responseQueues
          responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
      
        //一些metrics用来监控request和response的数量,代码略
        }
    
  • KafkaApis是Kafka的API接口层,可以理解为一个工具类,职责就是解析请求然后获取请求类型,根据请求类型将请求交付给对应的业务层

      class KafkaRequestHandlerPool(val brokerId: Int,
                                val requestChannel: RequestChannel,
                                val apis: KafkaApis,
                                numThreads: Int) extends Logging with KafkaMetricsGroup {
          
            /* a meter to track the average free capacity of the request handlers */
            private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
          
            this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
            val threads = new Array[Thread](numThreads)
            //初始化由KafkaRequestHandler线程构成的线程数组
            val runnables = new Array[KafkaRequestHandler](numThreads)
            for(i <- 0 until numThreads) {
              runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
              threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
              threads(i).start()
    }
    
  • KafkaRequestHandler.run()方法,就是不断从requestQueue中取出请求,调用API层业务处理逻辑进行处理

       def run() {
          while(true) {
            try {
              var req : RequestChannel.Request = null
              while (req == null) {
              //略
              req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue中取出请求
              //略
              apis.handle(req)//调用KafkaApi.handle(),将请求交付给业务
            } catch {}
          }
        }
    

3 参数调优设置

  • numProcessorThreads:通过num.network.threads进行配置,单个Acceptor所管理的Processor对象的数量。
  • maxQueuedRequests:通过queued.max.requests进行配置,请求队列所允许的最大的未响应请求的数量,用来给ConnectionQuotas进行请求限额控制,避免Kafka Server产生过大的网络负载;
  • totalProcessorThreads:计算方式为numProcessorThreads * endpoints.size,即单台机器总的Processor的数量;
  • maxConnectionsPerIp:配置项为max.connections.per.ip,单个IP上的最大连接数,用来给ConnectionQuotas控制连接数;
  • num.io.threads:表示KafkaRequestHander实际从队列中获取请求进行执行的线程数,默认是8个。

4 总结

  • 通过Acceptor、Processor、RequestChannel、KafkaRequestHandler以及KafkaApis多个角色的解析,完成了整个Kafka的消息流通闭环,即从客户端建立连接、发送请求给Kafka Server的Acceptor进行处理,进一步交由Processor、Kafka Server将请求交付给KafkaRequestHandler具体业务进行处理、业务将处理结果返回给网络层、网络层将结果通过NIO返回给客户端。

  • 由于多Processor线程、以及KafkaRequestHandlerPoll线程池的存在,通过交付-获取的方式而不是阻塞等待的方式,让整个消息处理实现完全的异步化,各个角色各司其职,模块之间无耦合,线程之间或者相互竞争任务,或者被上层安排处理部分任务,整个效率非常高,结构也相当清晰

  • 本文参考了大量技术博客,加上个人的理解,通过走读源码完成这篇学习笔记,辛苦成文,实属不易,各自珍惜。

  • 秦凯新 于深圳