Dubbo源码分析之 服务注册(例如:Zookeeper 为注册中心)

参考:http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html

官方文档是先分析服务导出,看了看官方文档的源码分析,服务导出是在是太复杂了,咱们暂且先看看服务注册,

本节我们来分析服务注册过程,服务注册操作对于 Dubbo 来说不是必需的,通过服务直连的方式就可以绕过注册中心。但通常我们不会这么做,直连方式不利于服务治理,仅推荐在测试服务时使用。对于 Dubbo 来说,注册中心虽不是必需,但却是必要的。因此,关于注册中心以及服务注册相关逻辑,我们也需要搞懂。

本篇内容以 Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册的入口方法开始分析,我们把目光移到 RegistryProtocol 的 export 方法上。

如下:

/**
 * 此方法是dubbo服务导出与注册的方法
 * @param originInvoker
 * @param <T>
 * @return
 * @throws RpcException
 */
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // ${导出服务} 
    // 省略其他代码
    //to judge if we need to delay publish

    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        //注册服务
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

更进register方法

/**
 * register 方法包含两步操作,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来分两节内容对这两步操作进行分析。
 * @param registryUrl
 * @param registeredProviderUrl
 */
public void register(URL registryUrl, URL registeredProviderUrl) {
    // 获取 Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注册服务
    registry.register(registeredProviderUrl);
}

1、创建注册中心

本节内容以 Zookeeper 注册中心为例进行分析。下面先来看一下 getRegistry 方法的源码,这个方法由 AbstractRegistryFactory 实现。

@Override
public Registry getRegistry(URL url) {
    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = url.toServiceStringWithoutResolving();
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // 访问缓存
        Registry registry = REGISTRIES.get(key);
        //看是否缓存是否命中,命中就直接返回
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 缓存未命中创建 Registry 实例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        // 写回入缓存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

跟进createRegistry方法,看看是怎么实现的

Dubbo源码分析之 服务注册(例如:Zookeeper 为注册中心)

是一个模板方法,具体功能由子类实现。

Dubbo源码分析之 服务注册(例如:Zookeeper 为注册中心)

可以看出注册有很多,比如redis等,这里我就以Zk的为例。

点进去看zk的实现

@Override
public Registry createRegistry(URL url) {
    // 创建 ZookeeperRegistry
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

直接调用了一个ZookeeperRegistry的构造函数。继续跟进如下:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 获取组名,默认为 dubbo
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        // group = "/" + group
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
    /**
     * 这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,
     * 默认为 CuratorZookeeperTransporter。
     */
    zkClient = zookeeperTransporter.connect(url);
    // 添加状态监听器
    zkClient.addStateListener(state -> {
        //判断状态是否是再次连接
        if (state == StateListener.RECONNECTED) {
            try {
                recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    });
}

在上面的代码代码中,我们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。

这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面我们到 CuratorZookeeperTransporter 中看一看。

跟进connect方法

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

这是dubbo的SPI调用,找到实现类

@Override
public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    //字段定义了zookeeper服务器,包括协议、主机、端口、用户名、密码
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    //避免创建时出现并发问题,所以添加lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        zookeeperClient = createZookeeperClient(toClientURL(url));
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}

CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例。以上基本上都是 Curator 框架的代码。

不熟悉Curator框架的可以看一篇这个博客:地址:https://blog.csdn.net/qq_34021712/article/details/82872311

本节分析了 ZookeeperRegistry 实例的创建过程。现在注册中心实例创建好了,接下来要做的事情是向注册中心注册服务,我们继续往下看。

2、节点创建

以 Zookeeper 为例,所谓的服务注册,本质上是将服务配置数据写入到 Zookeeper 的某个路径的节点下。

阅读服务注册的代码。服务注册的接口为 register(URL),这个方法定义在 FailbackRegistry 抽象类中。代码如下:

//服务注册的接口为 register(URL)
@Override
public void register(URL url) {
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        // 模板方法,由子类实现
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 获取 check 参数,若 check = true 将会直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        // 记录注册失败的链接
        addFailedRegistered(url);
    }
}

上诉代码,我们关注doRegister方法,这个也是个模板方法,毕竟do开头的方法一般都是具体的实现逻辑

Dubbo源码分析之 服务注册(例如:Zookeeper 为注册中心)

找到Zk实现下,代码如下:

@Override
public void doRegister(URL url) {
    try {
        // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

上诉代码主要是看create方法,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,跟进如下:

@Override
public void create(String path, boolean ephemeral) {
    if (!ephemeral) {
        //判断节点路径是否已经被包含了
        if(persistentExistNodePath.contains(path)){
            return;
        }
        // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
        if (checkExists(path)) {
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 递归创建上一级路径
        create(path.substring(0, i), false);
    }
    // 根据 ephemeral 的值创建临时或持久节点
    if (ephemeral) {
        //临时节点
        createEphemeral(path);
    } else {
        //持久节点
        createPersistent(path);
        //添加持久节点
        persistentExistNodePath.add(path);
    }
}

使用过zk的走着的zk节点有两种,一种是临时节点,重启就消失,一种是持久节点,不删除会一直存在

跟进createEphemeral方法,此是创建临时节点

@Override
public void createEphemeral(String path) {
    try {
        // 通过 Curator 框架创建节点
        client.create()
                .withMode(CreateMode.EPHEMERAL)//节点类型,持久节点
                .forPath(path);
    } catch (NodeExistsException e) {
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

跟进createPersistent方法,此是创建持久节点

//创建持久节点
@Override
public void createPersistent(String path) {
    try {
        client.create()
                .forPath(path);
    } catch (NodeExistsException e) {
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

 

到此本篇也就结束了,主要是参考了官网http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html文档,自己也在学习,跟着记录一下。