Dubbo源码解析(十六) Dubbo Transporter

Transport 定义

transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec

这里的transport可以理解为一个协议的服务器实现,如tomca是http服务器,这里的就是dubbo协议服务器。

谈起传输层,大家应该都能详细OSI七层模型,那么在计算机网络中,是怎么描述传输层呢?

OSI 模型

In computer networking, the transport layer is a conceptual division of methods in the layered architecture of protocols in the network stack in the Internet protocol suite and the OSI model. The protocols of this layer provide host-to-host communication services for applications.[1] It provides services such as connection-oriented communication, reliability, flow control, and multiplexing.

Dubbo源码解析(十六) Dubbo Transporter
图片来自网络,侵删

在计算机网络中,传输层是因特网协议套件和OSI模型中的网络堆栈中的协议的分层体系结构中的方法的概念划分。该层的协议为应用程序提供主机到主机的通信服务。如UDP、TCP。并且提供面向连接的通信,可靠性,流量控制和多路复用等服务。

在dubbo,这个Transporter就是对传输层的实现。它对于提供了dubbo服务间通讯的支持。有了它,各个服务就可以进行网络通信了,不再是信息孤岛了。

Transport

@SPI("netty")
public interface Transporter {

    /**
     * Bind a server.
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * Connect to a server.
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

通过这个接口定义可以知道:

  1. 通过Transporter可以知道dubbo使用的也是C/S架构
  2. Transport提供了创建服务端和客户端的功能
  3. 默认的Transporter是NettyTransporter
  4. 可以通过server/client、transport来配置server/client的类型,目前支持的类型有netty、mina等

那个Transporter到底是什么呢,下面对使用Netty实现的Transporter进行示例介绍

NettyServer

属性

Dubbo源码解析(十六) Dubbo Transporter

首先来瞅瞅,都有什么属性,简要来几个说一说

field desc
handler 通道处理器,用于处理客户端的请求
codec 编解码处理器,用于把客户端的请求和服务端的相应进行编解码,变为Request和Response对象

创建一个NettyServer

可以看到,啥也没做,直接调用的supper方法

public class NettyServer extends AbstractServer implements Server 
 public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

在这里我们看下这个父类AbstractServer干了什么

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = Constants.ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 允许接收的最大请求数
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            // 实际启动服务方法,模板方法,交由子类实现
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

上面初始化了服务器、本地地址以及服务器对外暴露地址、允许接收的最大请求accepts、空闲时间等

public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        // 根据url获得
        this.codec = getChannelCodec(url);
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
    
    

总的来说初始化了,编解码类Codec、通讯超时时间timeout、建立连接超时时间connect.timeout

 public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

以上就是启动一个Netty服务器之前的准备工作。但是刚才可能注意到在AbstractServer中有一个doOpen方法,下面一起瞅瞅怎么启动一个服务器的?

启动服务器

 protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
        // Netty的日常初始化
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                // 配置解码器
                                .addLast("decoder", adapter.getDecoder())
                                // 配置编码器
                                .addLast("encoder", adapter.getEncoder())
                                // 配置服务器空闲检测器,如长时间没有读或写的时候,后台是有定时检测线程。条件符合后会发送一个空闲事件IdleStateEvent,这样下游处理器可以接受这个事件进行处理
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

以上可以看到就是一个很平常的对Netty的配置,总共有4个处理器,分别是编码器、解码器、服务空闲检测器、逻辑处理器(NettyServer),那么前面三个都比较好理解,但是对于逻辑处理器应该有点东西,接下来就瞅瞅通道处理器

通道处理器

@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {
    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

    private final URL url;

    private final ChannelHandler handler;

    public NettyServerHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

    public Map<String, Channel> getChannels() {
        return channels;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            if (channel != null) {
                channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
            }
            handler.connected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
            handler.disconnected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }


    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.sent(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                logger.info("IdleStateEvent triggered, close channel " + channel);
                channel.close();
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.caught(channel, cause);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
}

从这个类上,可以看到以下几点

  1. Sharable 这个注解适用于标注一个channel handler可以被多个channel安全地共享。也就意味着每次使用这个Handler可以被多个ChannelPipelines使用
  2. 所有的逻辑交给了ChannelHandler去处理,比如建立连接、关闭连接、接受信息和发送信息
  3. 当接收到 IdleStateEvent 会把通道进行关闭

现在比较有意思的一点是,到这里还没有看到真实的处理逻辑器,ChannelHandler才是真正的处理逻辑?那又是个啥东西

其实NettyServer就是真实的通道处理器,不知道还记不记得在启动服务器那里,final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);这里的this,就是处理器,那么是不是明白了,原来本来NettyServer就是处理器啊
但是会发现这些处理方法,并没有在NettyServer对应的方法啊,其实是在它的父类 AbstractServer AbstractPeer 里面有相关的逻辑
AbstractServer

  @Override
    public void send(Object message, boolean sent) throws RemotingException {
        Collection<Channel> channels = getChannels();
        for (Channel channel : channels) {
            if (channel.isConnected()) {
                channel.send(message, sent);
            }
        }
    }
    
    @Override
    public void connected(Channel ch) throws RemotingException {
        // 如果服务器已经被关闭
        if (this.isClosing() || this.isClosed()) {
            logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
            ch.close();
            return;
        }

        Collection<Channel> channels = getChannels();
        // 判断请求是否满载
        if (accepts > 0 && channels.size() > accepts) {
            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
            ch.close();
            return;
        }
        // 调用父类连接
        super.connected(ch);
    }

    @Override
    public void disconnected(Channel ch) throws RemotingException {
        Collection<Channel> channels = getChannels();
        if (channels.isEmpty()) {
            logger.warn("All clients has disconnected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
        }
        // 调用父类关闭连接
        super.disconnected(ch);
    }

AbstractPeer.java

   @Override
    public void connected(Channel ch) throws RemotingException {
        if (closed) {
            return;
        }
        handler.connected(ch);
    }

    @Override
    public void disconnected(Channel ch) throws RemotingException {
        handler.disconnected(ch);
    }

    @Override
    public void received(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.received(ch, msg);
    }

    @Override
    public void caught(Channel ch, Throwable ex) throws RemotingException {
        handler.caught(ch, ex);
    }

可以看到,这里面还有一个Handler,到底还有多少个Handler,这个Handler又是从哪里来呢,其实就是在我们刚开始进来的时候的NettyServer初始化的时候传递过来的,这里我们需要知道一个点,就是Transport初始化是通过Transporters这个工具类进行初始化的。

也是就是这个方法Transporters#bind

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {}

再往上翻,所以可以知道是 Exchangers 这个工具类调用的Transportors

再往上翻就是Protocol层
Dubbo源码解析(十六) Dubbo Transporter

那么为什么处理逻辑会放到协议层呢?
首先以DubboProtocol 举例,看看这个到底处理了什么逻辑

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

       
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
          
        }

    };

这里面所有的方法都会由下面的reply方法进行统一处理

 @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            // 根据请求 获取Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            // callback调用,确认方法是否存在
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                // 省略 ...
            }
            RpcContext rpcContext = RpcContext.getContext();
            rpcContext.setRemoteAddress(channel.getRemoteAddress());
            // 执行逻辑
            Result result = invoker.invoke(inv);
            // 新的 CompletableFuture https://www.cnblogs.com/cjsblog/p/9267163.html
            if (result instanceof AsyncRpcResult) {
                return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);

            } else {
                return CompletableFuture.completedFuture(result);
            }
        }

到这里,不是到大家是否发现,invoker这个东西是和协议绑定的,它是保存在Protocol层中的,是一个Map,保存serviceKey和Invoker 的关系。所以需要读取invoker的话只能在Protocol实现了。

其实整个传输层的使用还是在发布服务的时候,详细看下面的序列图

Dubbo源码解析(十六) Dubbo Transporter

在第5步和第6步可看到对应的初始化时机。

总结

引一点段官网的话

Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。

从这段话里面可以发现,不是所有的都需要remoting这一层,因为这里dubbo协议使用的是tcp/ip层次的协议,所以首先需要进行tcp通信。所以需要开发一个dubbo协议对应的服务器。但是如果我们使用的是http,那么其实remoting这一层tomcat和jetty都已经帮我们实现好了。我们直接使用就好了。也就是在protocol进行openServer的时候,直接new HttpServer即可。而不是需要开发对应的服务器。所以我们可以认为这个Remoting就是协议通讯服务器。