dubbo 源码学习笔记 (七) —— 远程调用模块
欢迎访问我的个人博客休息的风
dubbo远程调用模块,围绕着Invoker展开,调用方由DubboInvoker实现Invoker接口,并持有远程通讯的客户端。发送Invocation到服务端,服务端处理后,把结果返回。DubboInvoker包装为RpcResult,这是最里层做的事情。外层DubboInvoker还由ProtocolFilterWrapper进行包装,生成一个调用链;再由ProtocolListenerWrapper包装,增加监听;最后通过JavassistProxyFactory生成代理对象。实现方由JavassistProxyFactory生成Invoker,里层是由Wrapper包装的AbstractProxyInvoker实例。接下来外层由ProtocolFilterWrapper进行包装,生成一个调用链;再由ProtocolListenerWrapper包装,增加监听;最后通过DubboExporter对象持有该Invoker。
过程如下图:(看不清可在新页签查看)
接下来我们具体分析其实现过程的源码,首先看下整个模块的类图情况,如下图(看不清可在新页签查看):
类图中,主要关注与Invoker、Filter、ProxyFactory、Protocol这几个接口相关的实现类。服务端的入口在ServiceConfig的doExportUrlsFor1Protocol方法中。先通过“Invoker<?> invoker = proxyFactory.getInvoker”JavassistProxyFactory获取Invoker;
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
获取的包装后的Inovker再通过“Exporter<?> exporter = protocol.export(invoker)”代码获取DubboExporter。ProtocolListenerWrapper和ProtocolFilterWrapper在通过SPI获取Protocol的时候依次包装Protocol类,ProtocolListenerWrapper代码如下,会去生成一个ListenerExporterWrapper增加相应的监听。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), //spi获取所有可用的ExporterListener监听 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); }之后在ProtocolFilterWrapper生成调用链
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //buildInvokerChain生成调用链 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //获取服务端的过虑Filter List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { //循环Filters,串成一条链 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }在DubboProtocol.export方法里,封装为一个DubboExporter
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); //封装为一个DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //省略很多代码。。。至此,服务端的Invoker就完成了,接下来看客户端的Invoker。在ReferenceConfig.createProxy作为创建代理对象的入口。
private T createProxy(Map<String, String> map) { //省略很多代码 if (urls.size() == 1) { //会调用DubboProtocol.refer去获取DubboInvoker invoker = refprotocol.refer(interfaceClass, urls.get(0)); }//省略很多代码 }这里的protocol通过SPI获取时,会有ProtocolListenerWrapper和ProtocolFilterWrapper包装,包装过程跟服务端的类似,这里不在赘述。主要看下DubboProtocol是如何构建DubboInvoker的。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }这里会去创建DubboInvoker对象,在创建前会先去获取与服务端连接用的client。
private ExchangeClient[] getClients(URL url) { //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接,否则每服务每连接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
拿到客户端连接后,DubboInvoker就能通过客户端发送Invocation,到服务端处理后返回结果包装为RPCResult对象。
返回的DubboInvoker再经由前面说的两层包装后,会通过JavassistProxyFactory.getProxy获取其代理对象。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }至此,客户端的Invoker也分析完了。这里需要说一下,我们在@Reference或配置文件里配置的<dubbo:reference>获取的bean,就是这里返回的代理。通过这个代理,去与服务端进行连接,处理,然后客户端解析返回结果。这样,我们对dubbo的Rpc调用的invoker过程就能比较熟悉了。