Dubbo, 协议源码-rpc 模块

rpc模块是远程调用模块,抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。这个模块的学习将使我们对服务的发布和调用更加清晰。

重点学习这几个类的作用和它们的实现类:ProxyFactory、Invoker、Protocol、Exporter。

Invoker

这是一个可执行的对象,能够根据方法,参数得到执行结果,代码如下:

public interface Invoker<T> extends Node { Class<T> getInterface(); Result invoke(Invocation invocation) throws RpcException;}

看一下Invocation的实现类:RpcInvocation。简略代码如下:

public interface Invocation { String getMethodName(); Class<?>[] getParameterTypes(); Object[] getArguments(); Map<String, String> getAttachments(); String getAttachment(String key); String getAttachment(String key, String defaultValue); Invoker<?> getInvoker();}

其实也只是提供了Invocation所需的参数而已。我们把目光又放回Invoker。

Invoker的执行过程分为三种类型

(1)本地执行类的Invoker

(2)远程通信执行类的Invoker

(3)多个(2)的Invoker聚合成的集群版的Invoker(需要设计到负载均衡)

ProxyFactory

对于server端,主要负责将服务统一进行包装成一个Invoker,这些Invoker通过反射来执行具体的对象的方法。

@SPI("javassist")public interface ProxyFactory { @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; @Adaptive({Constants.PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;}

实现类主要有JdkProxyFactory、JavassistProxyFactory。默认是JavassistProxyFactory。

Protocol

我们发布服务的第一个过程是将服务封装成一个本地执行的invoker,执行服务这个就是执行这个invoker(调用这个invoker的invoke方法),通过反射执行。

但是我们看到invoke方法的参数-invocation,这个参数是如何得来的?

服务端的Protocol需要根据指定的协议对外公布服务,当客户端根据协议调用这个服务时候,将用户传递的invocation参数交给invoker来执行,所以Protocol加入了远程通信协议这一块,根据用户的请求获取参数Invocation。

@Extension("dubbo")public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; void destroy();

服务发布的第二步是暴露invoker。

Exporter<?> exporter = protocol.export(invoker);

暴露过程即根据invoker的URL的配置信息来最终选择Protocol实现,默认是dubbo,扩展实现即DubboProtocol,然后再对DubboProtocol进行依赖注入,进行wrap包装。先来看看Protocol的实现情况:

可以看到在返回DubboProtocol之前,经过了ProtocolFilterWrapper(核心方法是buildInvokerChain,构建Chain)、ProtocolListenerWrapper、RegistryProtocol的包装。

包装时候的是装饰模式,类似AOP功能

当服务发布时候会先经过RegistryProtocol,这个类的主要功能如下:

1,利用内部的Protocol即DubboProtocol,将服务进行导出

2,根据注册中心的registryUrl获取注册服务Registry,然后将serviceUrl注册到注册中心上,供客户端订阅

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl() // export service. String key = serviceKey(url) DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap) exporterMap.put(key, exporter) //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT) Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false) if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY) if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")) } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods) } } openServer(url) return exporter }

 

创建一个DubboExporter,封装invoker。然后根据url的port、path(接口的名称)、版本号、分组号作为key,将DubboExporter存至Map.

首先根据Invoker的url获取ExchangeServer通信对象(负责与客户端的通信模块),以url中的host和port作为key存至Map.

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }};

可以看到在获取到Invocation参数后,调用getInvoker(channel, inv)来获取本地Invoker。获取过程就是根据channel获取port,根据Invocation inv信息获取要调用的服务接口、版本号、分组号等,以此组装成key,

public interface Exporter<T> { Invoker<T> getInvoker(); void unexport();}

包含了一个Invoker对象。一旦想撤销该服务,就会调用Invoker的destroy()方法,同时清理上述exporterMap中的数据。对于RegistryProtocol来说就需要向注册中心撤销该服务.

 

DubboCodec

这个类是上一篇遗留的,当初涉及到编解码问题,接下来就来解读dubbo传输的底层协议组成以及它的编码解码过程。

1,传输协议

协议格式

Dubbo, 协议源码-rpc 模块

 

3.bodydata

是消息传递的真正内容,body的占用的字节大小由协议头后四位保存。

 

4,序列化Request和Response

@Override protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { RpcInvocation inv = (RpcInvocation) data out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)) out.writeUTF(inv.getAttachment(Constants.PATH_KEY)) out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)) out.writeUTF(inv.getMethodName()) out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())) Object[] args = inv.getArguments() if (args != null) for (int i = 0 out.writeObject(encodeInvocationArgument(channel, inv, i)) } out.writeObject(inv.getAttachments()) }

5.编码:

Dubbo, 协议源码-rpc 模块

 

5.解码:

Dubbo, 协议源码-rpc 模块