九、RocketMQ Remoting(网络层)
1.问题
- RocketMQ线程模型是如何设计的
- 哪些请求走了10909端口,哪些走10911端口呢
- 如何实现长连接
- 如何实现同步、异步调用的超时控制
2.核心实现
- 1.类图doc/cn/image/rocketmq_design_3
-
- 2.服务端NettyRemotingServer
- 1.构造方法中,完成公共线程池的初始化
- 2.useEpoll:是否使用epoll->初始化netty的boss以及worker
- 条件:islinux && nettyServerConfig.isUseEpollNativeSelector() && Epoll.isAvailable()
- Boss线程初始化1,woker线程初始化3
- 3.NioEventLoopGroup#newChild-> NioEventLoop extends SingleThreadEventLoop-> SingleThreadEventExecutor:持有Thread thread对象,框架启动时,会执行SingleThreadEventExecutor.this.run();run方法的实现中包含NioEventLoop。
- NioEventLoop.run:在while(true)中,不断做select操作。当客户端有连接过来,每个连接对应一个channel,channel会注册到eventloop上,eventloop会寻找当前channel上有无事件,取到事件后处理。
- 4.使用NettyRemotingServer构造方法的代码
- BrokerController.initialize方法
- 调用这个构造方法,构建remotingServer(10911)、fastRemotingServer(10909)两个server,其监听端口不同。BrokerController管理rocketmq各个组件的生命周期。
- this.registerProcessor():注册请求处理器
- BrokerController.initialize方法
- 3.客户端NettyRemotingClient
3.服务端网络组件启动过程
- 1.BrokerController负责管理各组件的生命周期,initialize、start、shutdown分别负责初始化、启动、关闭各组件,在初始化阶段针对不同请求注册processor
- start方法:
- remotingServer.start()
- NettyRemotingServer#start
- ChannelOption.SO_BACKLOG 1024:socket的队列数
- ChannelOption.SO_KEEPALIVE false:rocketmq使用的是长连接,但为什么把keepalive关闭?
- Pipeline初始化:是在defaultEventExecutorGroup线程池基础上挂handler,如NettyEncoder、NettyDecoder等,而并不是使用woker线程池,woker线程池只是转发到这个defaultEventExecutorGroup线程池,所以woker线程池可以配置小些。
- NettyRemotingServer#start
- remotingServer.start()
- start方法:
- 2.processor分为七大类:send(生产者发送消息), pull(消费者拉取消息), query, clientManage(心跳), consumerManage(对于消费进度offset管理), endTransaction(提交或回滚事务), default(控制台命令),不同请求由不同线程池处理
- 3.NettyServerConfig中listenPort默认是8888,但BrokerStartup启动时改成了10911
- 4.分别在10909和10911启动监听,10909为fastRemotingServer,也被称为VIP通道,所有发消息请求和控制命令走10909
4.服务端Handler
1.入站Handler:
- A.HandshakeHandler,第一个入站handler,当客户端配置useTLS=true时生效,为服务端动态创建SSLHandler,并动态删除自己
- 1.取发送消息的第一个字节是HANDSHAKE_MAGIC_CODE,若客户端(生产者)未开启,以下逻辑不走。
- 在pipeline动态增加channel:1.sslcontext增加加解密的handler;2.FileRegionEncoder:将fileRegion(零拷贝实现)转换为byteBuf格式。
- 为什么要转换?byteBuf是何时用?
- SslHandler#write:将msg(类型为byteBuf)写入PendingWriteQueue。在SslHandler#wrap方法中可以看到强转为ByteBuf类型。
- 在pipeline动态增加channel:1.sslcontext增加加解密的handler;2.FileRegionEncoder:将fileRegion(零拷贝实现)转换为byteBuf格式。
- 2.服务端配置使用SSL,但客户端未配置,该链接关闭。
- 3.执行完第一次操作后,将动态的将自己移除掉。
- 1.取发送消息的第一个字节是HANDSHAKE_MAGIC_CODE,若客户端(生产者)未开启,以下逻辑不走。
- B.NettyDecoder,解码器,
- 该解码器作用:根据长度字段获取相应长度的数据包并解析,解决拆包粘包问题。格式协议见rocketmq_design_4,先去掉length(数据包总长度,不包含总长度本身自己)字段,再解析 序列化方式+header length、header内容、body
- NettyDecoder#decode->RemotingCommand#decode(java.nio.ByteBuffer)
- C.NettyServerHandler,处理具体业务逻辑
- ProcessMessageReceived
- REQUEST_COMMAND:收发消息
- RESPONSE_COMMAND:向客户端发送请求
2.出站Handler:
- A.NettyEncoder,编码器,按协议拼装报文NettyEncoder#encode->RemotingCommand#encodeHeader
- B.FileRegionEncoder,只在客户端配置useTLS时生效,将FileRegion转化为ByteBuf,SSLHandler需要ByteBuf类型
3.出入站Handler:
- A.IdleStateHandler,空闲连接检查,依赖defaultEventExecutorGroup线程执行调度任务,发现空闲连接触发IDLE事件,由NettyConnectManageHandler处理IDLE事件,IdleStateHandler#initialize:
- loop.schedule():判断当前线程是否活跃 ,如何实现,看AllIdleTimeoutTask#run
- 符合allIdleTimeNanos <(当前时间 – Max(lastReadTime, )lastWriteTime),认为是不活跃的连接,会封装一个ALL_IDLE事件,然后在pipeline中流转这个事件。当这个事件流转到NettyConnectManageHandler的处理器:NettyConnectManageHandler#userEventTriggered。其中的ClientHousekeepingServic#onChannelIdle来处理,比较粗暴,当连接关闭、异常、空闲的时候,执行关闭连接的操作。
- 记录这个时间lastReadTime:当请求来时,调用channelRead方法;读结束时,调用channelReadComplete赋值lastReadTime。
- loop.schedule():判断当前线程是否活跃 ,如何实现,看AllIdleTimeoutTask#run
- B.NettyConnectManageHandler,连接管理,主要用来关闭连接
- C.SSLHandler,第一个入站handler,最后一个出站handler
4.ChannlePipeline是双向链表,包含头结点、尾节点,所有handler按加入的顺序存放,入站时会从头结点开始依次调用所有入站handler,出站时RocketMQ会调ChannelHandlerContext.writeAndFlush,会从当前handler向前依次调用所有出站handler
- 整个pipeline,是行为传播,事件传播,动态增减handler
5.RemotingCommand协议
- doc/cn/image/rocketmq_design_4
- length,长度字段长度为4,表示type&header length + header + body的长度,不包含本身
- type + header length,该字段长度为4,最高位字节表示序列化类型,其他三个字节表示header字段的长度
- header,定义多种类型的header,POJO
- body,消息体
6.服务端线程模型
- doc/cn/image/rocketmq_design_6
- 典型的多线程Reactor模型,金字塔型,每一层都有专属线程池,逻辑越复杂、耗时越高线程池资源越多
- 不同Handler是在不同的线程中完成任务,每个handler会对应一个线程。如channelRead举例说明
- io.netty.handler.timeout.IdleStateHandler#channelRead
- AbstractChannelHandlerContext#invokeChannelRead
- findContextInbound
- EventExecutor executor = next.executor()
- executor.inEventLoop()):判断当前线程与下一个executor的线程是否为同一线程
- AbstractChannelHandlerContext#invokeChannelRead
- io.netty.handler.timeout.IdleStateHandler#channelRead
- executor.inEventLoop(),避免竞争,串行化
- 线程数是2的幂次的好处之一是将取模操作转化为与操作
7.长连接的实现方法
- 在Java代码中只有属性SO_KEEPALIVE可以设置,默认是false
- 方式1:开启SO_KEEPALIVE,设置true并配置内核参数
- tcp_keepalive_time:最大保活时长
- tcp_keepalive_probes:超过最大保活时间后发送探测包的次数
- tcp_keepalive_intvl:探测包发送时间间隔
- sysctl –a | grep keepalive
- 方式2:应用自主维护心跳和空闲连接检查
- 方式3:使用支持长连接的通信协议,例如Http1.1
8.客户端同步调用
- NettyRemotingClient
- Start()
- invokeAsync
- getAndCreateChannel:非阻塞的创建连接,返回channelFuture对象
- invokeSyncImpl:通过原子的integer(不会超吗)维护请求与返回的映射关系;通过网络层发送请求;同步等待结果(countdownlatch);
- NettyRemotingAbstract#processMessageReceived:RESPONSE_COMMAND:
- 客户端跟Broker和Namesrv的连接是懒加载的,需要时才创建,并缓存,创建Namesrv连接时addr为null,原因是Namesrv地址已经配好,轮询的访问即可
- 非阻塞的创建连接,并等待连接创建或超时,等待和唤醒使用wait, notify
- 同步调用超时控制由CountDownLatch控制
- 发送失败和发送超时是两种异常
- 同步调用响应请求使用的还是当前线程,当前线程在结果返回之前一直是阻塞状态
9.客户端异步、单向调用
- invokeOnewayImpl:并不在意是否发送成功,只是记录日志。
- 通过信号量控制并发NettyRemotingClient#invokeAsync
- 控制并发数:semaphoreAsync.tryAcquire
- SemaphoreReleaseOnlyOnce.realse:通过cas方式只能释放一次
- 异步调用回调默认使用publicExecutor
- 异步的超时控制由调度任务scanResponseTable完成,超时则触发回调并从responseTable中删除
- 异步发送和单向发送的区别是异步发送关注是否发送成功,单向不关注,如果是单向调用服务端也不会返回response
10.问题
1.网络层的异步发送,通过信号量控制并发,控制每个channel的写操作并发数semaphoreAsync.tryAcquire,异步的需要控制,同步不需要控制