Dubbo源码解析(十六) Dubbo Transporter
Transport 定义
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
这里的transport可以理解为一个协议的服务器实现,如tomca是http服务器,这里的就是dubbo协议服务器。
谈起传输层,大家应该都能详细OSI七层模型,那么在计算机网络中,是怎么描述传输层呢?
OSI 模型
In computer networking, the transport layer is a conceptual division of methods in the layered architecture of protocols in the network stack in the Internet protocol suite and the OSI model. The protocols of this layer provide host-to-host communication services for applications.[1] It provides services such as connection-oriented communication, reliability, flow control, and multiplexing.
图片来自网络,侵删
在计算机网络中,传输层是因特网协议套件和OSI模型中的网络堆栈中的协议的分层体系结构中的方法的概念划分。该层的协议为应用程序提供主机到主机的通信服务。如UDP、TCP。并且提供面向连接的通信,可靠性,流量控制和多路复用等服务。
在dubbo,这个Transporter就是对传输层的实现。它对于提供了dubbo服务间通讯的支持。有了它,各个服务就可以进行网络通信了,不再是信息孤岛了。
Transport
@SPI("netty")
public interface Transporter {
/**
* Bind a server.
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
/**
* Connect to a server.
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
通过这个接口定义可以知道:
- 通过Transporter可以知道dubbo使用的也是C/S架构
- Transport提供了创建服务端和客户端的功能
- 默认的Transporter是NettyTransporter
- 可以通过server/client、transport来配置server/client的类型,目前支持的类型有netty、mina等
那个Transporter到底是什么呢,下面对使用Netty实现的Transporter进行示例介绍
NettyServer
属性
首先来瞅瞅,都有什么属性,简要来几个说一说
field | desc |
---|---|
handler | 通道处理器,用于处理客户端的请求 |
codec | 编解码处理器,用于把客户端的请求和服务端的相应进行编解码,变为Request和Response对象 |
创建一个NettyServer
可以看到,啥也没做,直接调用的supper方法
public class NettyServer extends AbstractServer implements Server
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
在这里我们看下这个父类AbstractServer
干了什么
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = Constants.ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 允许接收的最大请求数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 实际启动服务方法,模板方法,交由子类实现
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
上面初始化了服务器、本地地址以及服务器对外暴露地址、允许接收的最大请求accepts、空闲时间等
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
// 根据url获得
this.codec = getChannelCodec(url);
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
总的来说初始化了,编解码类Codec、通讯超时时间timeout、建立连接超时时间connect.timeout
public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
以上就是启动一个Netty服务器之前的准备工作。但是刚才可能注意到在AbstractServer中有一个doOpen方法,下面一起瞅瞅怎么启动一个服务器的?
启动服务器
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
// Netty的日常初始化
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
// 配置解码器
.addLast("decoder", adapter.getDecoder())
// 配置编码器
.addLast("encoder", adapter.getEncoder())
// 配置服务器空闲检测器,如长时间没有读或写的时候,后台是有定时检测线程。条件符合后会发送一个空闲事件IdleStateEvent,这样下游处理器可以接受这个事件进行处理
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
以上可以看到就是一个很平常的对Netty的配置,总共有4个处理器,分别是编码器、解码器、服务空闲检测器、逻辑处理器(NettyServer),那么前面三个都比较好理解,但是对于逻辑处理器应该有点东西,接下来就瞅瞅通道处理器
通道处理器
@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
private final URL url;
private final ChannelHandler handler;
public NettyServerHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
public Map<String, Channel> getChannels() {
return channels;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
handler.disconnected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.sent(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
logger.info("IdleStateEvent triggered, close channel " + channel);
channel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.caught(channel, cause);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
从这个类上,可以看到以下几点
- Sharable 这个注解适用于标注一个channel handler可以被多个channel安全地共享。也就意味着每次使用这个Handler可以被多个ChannelPipelines使用
- 所有的逻辑交给了ChannelHandler去处理,比如建立连接、关闭连接、接受信息和发送信息
- 当接收到 IdleStateEvent 会把通道进行关闭
现在比较有意思的一点是,到这里还没有看到真实的处理逻辑器,ChannelHandler才是真正的处理逻辑?那又是个啥东西
其实NettyServer就是真实的通道处理器,不知道还记不记得在启动服务器那里,final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
这里的this,就是处理器,那么是不是明白了,原来本来NettyServer
就是处理器啊
但是会发现这些处理方法,并没有在NettyServer对应的方法啊,其实是在它的父类 AbstractServer AbstractPeer 里面有相关的逻辑
AbstractServer
@Override
public void send(Object message, boolean sent) throws RemotingException {
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
if (channel.isConnected()) {
channel.send(message, sent);
}
}
}
@Override
public void connected(Channel ch) throws RemotingException {
// 如果服务器已经被关闭
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
// 判断请求是否满载
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
// 调用父类连接
super.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {
Collection<Channel> channels = getChannels();
if (channels.isEmpty()) {
logger.warn("All clients has disconnected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
}
// 调用父类关闭连接
super.disconnected(ch);
}
AbstractPeer.java
@Override
public void connected(Channel ch) throws RemotingException {
if (closed) {
return;
}
handler.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {
handler.disconnected(ch);
}
@Override
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
@Override
public void caught(Channel ch, Throwable ex) throws RemotingException {
handler.caught(ch, ex);
}
可以看到,这里面还有一个Handler,到底还有多少个Handler,这个Handler又是从哪里来呢,其实就是在我们刚开始进来的时候的NettyServer初始化的时候传递过来的,这里我们需要知道一个点,就是Transport初始化是通过Transporters这个工具类进行初始化的。
也是就是这个方法Transporters#bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {}
再往上翻,所以可以知道是 Exchangers 这个工具类调用的Transportors
再往上翻就是Protocol层
那么为什么处理逻辑会放到协议层呢?
首先以DubboProtocol 举例,看看这个到底处理了什么逻辑
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public void received(Channel channel, Object message) throws RemotingException {
}
@Override
public void connected(Channel channel) throws RemotingException {
}
@Override
public void disconnected(Channel channel) throws RemotingException {
}
};
这里面所有的方法都会由下面的reply方法进行统一处理
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 根据请求 获取Invoker
Invoker<?> invoker = getInvoker(channel, inv);
// callback调用,确认方法是否存在
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
// 省略 ...
}
RpcContext rpcContext = RpcContext.getContext();
rpcContext.setRemoteAddress(channel.getRemoteAddress());
// 执行逻辑
Result result = invoker.invoke(inv);
// 新的 CompletableFuture https://www.cnblogs.com/cjsblog/p/9267163.html
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
到这里,不是到大家是否发现,invoker这个东西是和协议绑定的,它是保存在Protocol层中的,是一个Map,保存serviceKey和Invoker 的关系。所以需要读取invoker的话只能在Protocol实现了。
其实整个传输层的使用还是在发布服务的时候,详细看下面的序列图
在第5步和第6步可看到对应的初始化时机。
总结
引一点段官网的话
Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
从这段话里面可以发现,不是所有的都需要remoting这一层,因为这里dubbo协议使用的是tcp/ip层次的协议,所以首先需要进行tcp通信。所以需要开发一个dubbo协议对应的服务器。但是如果我们使用的是http,那么其实remoting这一层tomcat和jetty都已经帮我们实现好了。我们直接使用就好了。也就是在protocol进行openServer的时候,直接new HttpServer即可。而不是需要开发对应的服务器。所以我们可以认为这个Remoting就是协议通讯服务器。