dubbo 源码学习笔记 (二) —— dubbo发布服务的过程
欢迎访问我的个人博客休息的风
dubbo基础类ExtensionLoader和URL贯穿整个框架,掌握这两个类的思想和源码,就相当于对dubbo有了初步的认识。接着,我们来分析一下dubbo是如何发布一个服务的。
以ServiceConfig.doExportUrlsFor1Protocol为起点,在使用spring容器启动时,会调用该方法进行发布服务的一系列操作。简单来说,发布的过程可以简化为,服务转换为invoker->通过netty创建server,并将dubboInvoker放到工作线程池中->在zk上注册服务信息。这样在注册中心就会有服务相关的信息,在netty的工作线程中,就会有dubbo真实的invoker。需要使用的时候就在注册中心找信息,然后连接netty,去工作线程中拿一个线程执行。
以下是整理dubbo发布服务的整个源码调用过程:(看不清,请点击新的页签进行查看)
首先,在ServiceConfig.doExportUrlsFor1Protocol这个方法里是真正发布服务的入口。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //省略一些代码。。。。 //前面将各种配置信息都封装进map中,然后构建url对象 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } //这里就是暴露服务的入口 String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { //方法里的url是如下形式: //injvm://127.0.0.1/com.zyy.service.inter.OrderLogService?anyhost=true&application=order-basic // &dubbo=2.5.5&generic=false&interface=com.zyy.service.inter.OrderLogService // &methods=queryOrderLogs,update,insert&pid=9304&side=provider×tamp=1507951011792&validation=true //表示的意思大概是:injvm 表示本地暴露, 访问地址是127.0.0.1, 访问路径是com.zyy.service.inter.OrderLogService //接下来是参数信息,存在URL的parameters的属性里 exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //registryURL的值示例如下: //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=order-basic&dubbo=2.5.5 // &pid=9304®istry=zookeeper×tamp=1507951010581 //这里的url表示意思如下: registry 表示当前的url是用来注册的; 访问地址是127.0.0.1 端口号是2181; 访问路径是com.alibaba.dubbo.registry.RegistryService //application=order-basic 应用名是:order-basic 等的信息存在url的parameters属性里 //这里的proxyFactory.getInvoker使用的是JavassistProxyFactory.getInvoker方法, Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //这里的protocol具体使用的是哪个,它的判断过程是: // com.alibaba.dubbo.common.URL url = arg0.getUrl(); // String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); // 再根据 extName 获取相应的spi类 //如上讲的,invoker里面会存有一个url,这个url的协议是registry, 所以protocol具体的类是RegistryProtocol Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); }这个方法首先是在构建URL对象,之后根据scope的值,决定暴露本地服务还是远程服务(其实这个scope为null,也就是本地和远程都会暴露);这里的proxyFactory.getInvoker使用的是JavassistProxyFactory.getInvoker方法,
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }先用Wrapper.getWrapper进行包装,再实例化AbstractProxyInvoker,也就是说“Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));”这句代码返回的Invoker的实例是AbstractProxyInvoker的实例。
“Exporter<?> exporter = protocol.export(invoker);”这里的protocol确定调用是哪个协议的过程比较不好理解。因为这个过程的代码是生成的,用Javassist去加载使用的。类名为Protocol$Adpative,export的方法为:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }这个方法的入参就是上一句代码返回的Invoker,Invoker里面会存有一个url,这个url的协议值是registry, extName的值就是registry,通过spi的加载机制,可以确定protocol具体的类是RegistryProtocol。
在RegistryProtocol.export的方法里,简单的过程是:DubboProtocl启动netty,发布服务到工作线程(通用情况下是dubbo协议和netty做网络通讯)-> 往zk上注册信息和监听(注册中心一般使用zk) -> 返回一个new Exporter
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker //使用dubboProtocl启动netty,发布服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider //通过registryFactory获取一个Registry, final Registry registry = getRegistry(originInvoker); //对要注册到注册中心的url做一些处理 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //如果用zk,会在ZookeeperRegistry.doRegister里真正注册 registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); ///如果用zk,会在ZookeeperRegistry.doSubscribe里真正订阅监听 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } //省略一些代码。。。。这个方法里主要是发布的一个过程,真正的细节,还是在zookeeperRegistry注册,DubboProtocol启动netty,和发布服务。特别是启动netty网络层比较复杂,之后会单独分析。这里只做一个简单的过程分析。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); //封装为一个DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //省略一些代码 。。。。。。 //网络通讯的入口,会去默认用netty创建 openServer(url); return exporter; }在openServer的方法里,会去调用createServer方法,开启一个Server服务端通讯的入口。
private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //通过spi机制,真正使用HeaderExchanger去bind,这里可以看一下requestHandler, // 这个就是之后客户端refer真正处理的地方 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } //省略一些代码。。。。requestHandler的代码如下:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //从DubboExporter里获取Invoker Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { //callback的处理,省略。。。。 } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //反射调用,真正客户端方法调用的地方 return invoker.invoke(inv); }
在HeaderExchanger里,通过Transporters,让具体的协议与发布过程解耦。通过SPI机制,使用NettyTransporter去创建一个NettyServer,doOpen方法,开启一个netty服务器。如果有用过netty的,应该会比较熟悉这段代码。
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); //两个线程池,用于请求和工作处理 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { //使用适配,用DubboCodec去编码解码 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); //工作线程的处理handler pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress());这样,就启动一个netty的服务端了。并且也将真正处理的DubboProtocol的requestHandler绑定到netty的工作线程池里了。这样,如果要获取一个请求,就可以知道,最终的处理,是通过netty工作线程池里拿一个线程,去用requestHandler去处理,把处理结果再通过netty返回客户端。至于如何找到哪个机器上的哪个服务的,就是通过注册中心以及负载均衡去处理了。这篇博客主要是描述发布服务的整个过程,其中有些细节省略,关注点在整个过程是如何发布的。