Spring Cloud 注册中心zookeeper

大家好,我是杰哥,前两次文章主要讲述了关于Zookeeper的服务端机制

那么今天,就跟着杰哥,转到客户端的角度,通过跟踪源码,进一步揭开zooKeeper的神秘面目吧~

一 概况

大致了解

首先,通过一个图,让大家对于zookeeper的工作流有一个大概的印象

Spring Cloud 注册中心zookeeper

1)图中 包含客户端ClientSendThreadEventThreadServer以及Watcher五个重要角色

2)SendThread负责将ZooKeeper的请求信息封装成一个Packet,发送给 Server ;并维持同Server的心跳

3)Server:处理不同请求,返回response给EventThread

4)EventThread负责解析通过SendThread得到的Response,最后发送给Watcher

5)Watcher:通过调用processEvent进行具体事件处理

好了,有了一个大致印象了 ,那下面的源码环节将会很easy了~

二 源码

具体探究

细心的你一定发现了,在zookeeper的客户端脚本 zkCli.sh中,我们发现,它实际上是通过加载org.apache.zookeeper.ZooKeeperMain启动的

Spring Cloud 注册中心zookeeper

那么,我们就先从ZooKeeperMain启动类进入zookeeper客户端源码的探索之旅吧~

查看ZooKeeperMainmain()方法,看到该包含两个步骤:

Spring Cloud 注册中心zookeeper

  1. 构造ZooKeeperMain对象并建立连接

  2. 读取终端输入并解析命令

来看看这两个步骤具体是如何处理的~

01. 构造对象 建立连接

1)进入ZookeeperMain()

Spring Cloud 注册中心zookeeper

初始化命令参数各个可选项,调用connectToZK()方法连接到server端

2)进入方法conectToZK()

Spring Cloud 注册中心zookeeper

构造Zookeeper对象,建立连接

3)继续往下跟踪,进入Zookeeper(...)

Spring Cloud 注册中心zookeeper

创建ClientCnxn对象,并调用了它的start()方法

我们分别看看这两个步骤分别做了什么

4)先看看 这个对象初始化的时候都干了什么 

Spring Cloud 注册中心zookeeper

我们看到,它分别初始化了很多参数,包括主机列表连接超时时间读取超时时间等。最后还初始化了客户端的2个核心线程:SendThreadEventThread

小插曲:顺便说一句,这两个线程是zookeeper的重要角色,也将是我们今天的主角

5)然后呢,再看看:cnxn.start()方法

这个方法的作用就是:分别启动这两个线程

Spring Cloud 注册中心zookeeper

02. 读取并解析命令

1)进入zookeeper的run()方法

Spring Cloud 注册中心zookeeper

我们看到,该方法通过反射调用jline.ConsoleReader类以及该类的addCompletor()方法,对终端输入进行读取。然后调用executeLine()方法,逐个执行单行命令

2)进入executeLine()方法


Spring Cloud 注册中心zookeeper

我们看到该方法首先通过方法cl.parseCommand(line)方法对命令行进行解析,然后调用processCms(cl)方法执行各个命令

3)进入processCms(cl)方法

Spring Cloud 注册中心zookeeper

该方法再调用processZKCmd()方法,对于抛出的异常分别进行分类处理

4)进入processZKCmd()方法

Spring Cloud 注册中心zookeeper

由于方法比较长,我们分为两部分来查看

a 第一部分,我们看到,对于quitredohistoryprintwatches以及connect方法,直接进行相应处理

Spring Cloud 注册中心zookeeper

到了第二部分的方法,包括对节点的各个操作,则需要在连接建立成功的情况下执行,具体如何执行,我们再来一探究竟~

ZooKeeper.create()为例

1) 上图中,如果指令为create,就会调用zk.create(..)方法

2)进入create(...)方法

我们看到create命令被封装成了一个 CreateRequest对象request,然后调用submitRequest()进行节点创建

Spring Cloud 注册中心zookeeper

3) 进入submitRequest(...)

Spring Cloud 注册中心zookeeper

我们看到zookeeper通过调用queuePacket(...)方法将Request封装成一个Packet

4)进入queuePacket(...)方法

Spring Cloud 注册中心zookeeper

将packet加入SendThread的outgoingQueue队列中,等待执行。并唤醒selector

接下来,在SendThread.run()的while循环中,ZooKeeper将会通过doTransport()将存放在outgoingQueue中的Packet包发送给 Server

03. SendThread

我们在前面有提到,SendThread 的主要作用是:

  • Packet包发送给Server

  • 维持Client和Server之间的心跳,确保 session 存活

现在让我们从源码出发,看看SendThread究竟做了哪些工作

SendThread是一个线程类,因此我们进入其run()方法,看看它的启动流程

1)先整体来看,run()方法是通过一个while循环,进行具体任务处理

Spring Cloud 注册中心zookeeper

Spring Cloud 注册中心zookeeper

若状态为关闭或者权限验证失败,则关闭socket连接,并由eventThread处理关闭连接事件

a 与server建立连接

Spring Cloud 注册中心zookeeper

b 判断超时

Spring Cloud 注册中心zookeeper

可以看到会分别判断readTimeoutconnetTimeout 两个超时时间,一旦发现链接超时,则抛出异常,终止SendThread

c 发送心跳

在没有超时且为连接状态的情况下,若已经达到心跳间隔时间,或者在最大时间间隔MAX_SEND_PING_INTERVAL内还没有发送packet。会再次发送心跳数据,避免访问超时

Spring Cloud 注册中心zookeeper

d 发送指令

Spring Cloud 注册中心zookeeper

整体来看,SendThread的主要任务即为:

  • 创建同 Server 之间的 socket 链接

  • 判断链接是否超时

  • 定时发送心跳任务

  • 将ZooKeeper指令发送给Server

我们主要来看看建立连接的过程与发送指令的过程   

2) 与 Server 的长链接

a 进入startConnect()

通过调用抽象类ClientCnxnSocket的connect()方法进行socket连接,该抽象类的默认实现是ClientCnxnSocketNIO类。

Spring Cloud 注册中心zookeeper

b 在ClientCnxnSocketNIO.connect()中我们可以看到,与Server之间创建了一个socket链接,并调用registerAndConnect()方法注册并连接到主机地址上

Spring Cloud 注册中心zookeeper

c 进入registerAndConnect(...)方法。

Spring Cloud 注册中心zookeeper

可以看到,zookeeper会首先将sock注册到selector,然后调用sock.connect()连接服务器,判断当前是否是初次连接。若是,则进入初始化连接primeConnection()方法

d 进入primeConnection()方法

首先设置首次连接为false,然后初始化sessionId,并建立连接

Spring Cloud 注册中心zookeeper

e 接下来,将该连接事件组合成packet对象,并添加到发送队列中

需要注意的是,连接事件的requestHeader请求头)为null

Spring Cloud 注册中心zookeeper

f  设置为可读可写

调用clientCnxnSocket.enableReadWriteOnly()开启监听事件的读写功能

Spring Cloud 注册中心zookeeper

那么到现在为止,已成功完成连接。接下来就要执行doTransport() 了~

3)   发送 ZooKeeper 指令

a 进入doTransPort()

Spring Cloud 注册中心zookeeper

该方法,首先会确保连接成功建立,调用doIO()方法进行处理,然后调用findSendablePacket(...)方法将连接事件的packet放到outgoingQueue头部

Spring Cloud 注册中心zookeeper

b 进入doIO()方法

该方法会分别判断key值是否是可读、或者可写的,分别进行读、写事件的处理

Spring Cloud 注册中心zookeeper

c readable()

先来看看对读操作的处理

Spring Cloud 注册中心zookeeper

调用readResponse()将其加到eventTread

d 进readResonse()方法

小插曲-Tips:

 zookeeper的消息分为三种:

  • ping 消息:XID=-2

  • auth认证消息:XID=-4

  • 订阅的消息:XID=-1

订阅的消息,也就是节点变化的通知消息。比如子节点变化、节点内容变化

Spring Cloud 注册中心zookeeper

我们看到readResonse()方法获取到这类消息,通过eventThread.queueEvent() 将消息推入事件队列waitingEvents,等待后续处理

Spring Cloud 注册中心zookeeper

e writable()

进入第二部分,zookeeper对于写操作的处理

Spring Cloud 注册中心zookeeper

锁定outgoningQueue进行如下处理:将事件封装成packet对象,设置事件的xid,若!p.bb.hasRemaining()为true,表示该事件已发送成功,那么删除outgoingQueue中的事件,并将该事件添加到pendingQueue中,等待后续处理

04. EventThread

进入EventThread的run()方法

Spring Cloud 注册中心zookeeper

我们看到该方法对获取到的事件通过方法processEvent()方法进行处理

因此我们就主要来看看processEvent()方法的逻辑

Spring Cloud 注册中心zookeeper

我们看到,该方法首先会判断事件是否是WatcherSetEventPair的实例

若是,则依次调用 watcher.process(pair.event)进行处理

否则就会以异步回调方式处理。根据 p.response() 判断为哪种响应类型,执行响应的回调方法 processResult()

好了~ zooKeeper客户端的源码还是比较简单的吧,分析到这里,也基本搞清楚了它的具体处理流程

根据以上的分析,我们就可以把最开始的zookeeper的工作流再细化一点,变成下面这个样子:

Spring Cloud 注册中心zookeeper

图中细化到可以看到SendThread中处理过程包含的outgoingQueuependingQueue,并且SendThread和EventThread是通过Clientcnxn来控制处理的

处理流程为:

1)Client发起request给Zookeeper类

2)Zookeeper类将处理该request,并将其放入outgoingQueue(发送队列)

3)Zookeeper Server端处理发送队列中的该事件,并将该事件放到待处理队列PendingQueue中

4)由EventThreadt消费该pendingQueue中的该事件

5)分发给不同的watcher 进行事件的处理

三 总结

总而言之

也就是说,Client中在终端输入指令后,首先会被封装成一个Request请求。然后通过submitRequest,进一步被封装成Packet包,提交给SendThread处理

SendThread再通过doTransport()Packet发送给Server,并通过readResponse获取结果,解析成一个Event,再将Event加入EventThread的队列中等待执行

EventThread通过processEvent消费队列中的Event事件

是不是更深入理解啦~

那么到现在为止,我们的注册中心章节之-zookeeper篇到这里就结束啦~

嗯,就这样。每天学习一点,时间会见证你的强大~