zookeeper-watcher异步调用子服务

zookeeper=文件系统+通知机制

一、 文件系统

Zookeeper维护一个类似文件系统的数据结构:

每个子目录项如 NameService 都被称作为 znode,和文件系统一样,我们能够*的增加、删除znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的。

有四种类型的znode:

1)PERSISTENT-持久目录节点

客户端与zookeeper断开连接后,该节点依旧存在

2) PERSISTENT_SEQUENTIAL-持久顺序节点

客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号

3)EPHEMERAL-临时目录节点

客户端与zookeeper断开连接后,该节点被删除

4)EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点

客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

 

zookeeper-watcher异步调用子服务

 

 

二、 通知机制

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。

 

我们可以自定义Watcher,如果是Boolean型变量,当为true时,则使用系统默认的Watcher,系统默认的Watcher是在Zookeeper的构造函数中定义的Watcher。参数中Watcher为空或者false,表示不启用Wather。

 

1、一次性触发器

客户端在Znode设置了Watch时,如果Znode内容发生改变,那么客户端就会获得Watch事件。例如:客户端设置getData("/znode1", true)后,如果/znode1发生改变或者删除,那么客户端就会得到一个/znode1的Watch事件,但是/znode1再次发生变化,那客户端是无法收到Watch事件的,除非客户端设置了新的Watch。

 

2、发送至客户端

Watch事件是异步发送到Client。Zookeeper可以保证客户端发送过去的更新顺序是有序的。例如:某个Znode没有设置watcher,那么客户端对这个Znode设置Watcher发送到集群之前,该客户端是感知不到该Znode任何的改变情况的。换个角度来解释:由于Watch有一次性触发的特点,所以在服务器端没有Watcher的情况下,Znode的任何变更就不会通知到客户端。不过,即使某个Znode设置了Watcher,且在Znode有变化的情况下通知到了客户端,但是在客户端接收到这个变化事件,但是还没有再次设置Watcher之前,如果其他客户端对该Znode做了修改,这种情况下,Znode第二次的变化客户端是无法收到通知的。这可能是由于网络延迟或者是其他因素导致,所以我们使用Zookeeper不能期望能够监控到节点每次的变化。Zookeeper只能保证最终的一致性,而无法保证强一致性。

 

3、设置watch的数据内容

Znode改变有很多种方式,例如:节点创建,节点删除,节点改变,子节点改变等等。Zookeeper维护了两个Watch列表,一个节点数据Watch列表,另一个是子节点Watch列表。getData()和exists()设置数据Watch,getChildren()设置子节点Watch。两者选其一,可以让我们根据不同的返回结果选择不同的Watch方式,getData()和exists()返回节点的内容,getChildren()返回子节点列表。因此,setData()触发内容Watch,create()触发当前节点的内容Watch或者是其父节点的子节点Watch。delete()同时触发父节点的子节点Watch和内容Watch,以及子节点的内容Watch。

 

 

 

Zookeeper Watcher的运行机制

1,Watch是轻量级的,其实就是本地JVM的Callback,服务器端只是存了是否有设置了Watcher的布尔类型。(源码见:org.apache.zookeeper.server.FinalRequestProcessor)

2,在服务端,在FinalRequestProcessor处理对应的Znode操作时,会根据客户端传递的watcher变量,添加到对应的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中进行持久化存储,同时将自己NIOServerCnxn做为一个Watcher callback,监听服务端事件变化

3,Leader通过投票通过了某次Znode变化的请求后,然后通知对应的Follower,Follower根据自己内存中的zkDataBase信息,发送notification信息给zookeeper客户端。

4,Zookeeper客户端接收到notification信息后,找到对应变化path的watcher列表,挨个进行触发回调

 

 

watcher分类:

defaultWatcher

existWatcher-->exist()

childWatcher-->getChildren()

dataWatcher-->getData()

 

watcher原理:

1、创建zookeeper实例的时候接收一个watcher参数,赋值给watchMnanger.defaultWatcher,此watcher与其它watcher不同,主要用于响应与链接状态转换有关的事件(建立链接,关闭链接等)

2、ZKWatchManager是客户端watcher管理器,负责跟踪多种watcher,每种类型的watcher将会被存在各自的Map中,key为path,value为Set<Watcher>。watcher由ZKWatchManager负责管理,并不会随请求发送给server,而只会发给server此请求类型是否注册了watch

3、在ZKDatabase中,包括一个DataTree,此dataTree持有对nodes以及相关的watcher的数据,server端,WatcherManager是管理client注册的watcher,其数据结构为HashSet<path,Set<Watcher>>

4、请求到达server之后,在FinalRequestProcessor中,将会处理各种请求,如果检测到request.getWatch()为true,即请求要求注册watch,那么将会把ServerCnxn和path关联起来,加入到WatherManager相应的列表中.

5、客户端的请求响应之后,由SendThread.readResponse()处理响应,如果响应code为成功且此请求中注册了watch,那么将会把此wath添加到响应的watch列表中。

6、DataTree持有2个WatchManager对象,分别为dataWatches用于管理注册data操作的watch,childWatches用于管理注册child操作的watch。

 

 

Zookeeper的Watcher机制主要包括客户端线程、客户端WatchManager和Zookeeper服务器三部分。在具体的流程上,客户端向Zookeeper服务器注册Watcher事件监听的同时,会将Watcher对象存储在 客户端WatchManager中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象执行回调逻辑(process方法)。

 

 

流程:

1、不同子服务在zk上注册同一个znode节点

2、第一个子服务在节点上创建数据

3、第二个子服务监听节点的变化

4、当节点改变时做出对应的操作

 

 

 

 

方法一:watche机制回调

创建的是持久节点

需要自己将传递的参数由json转化为String

子服务定时调用getData方法,判断是否执行api

一个外部调用对应一个监听

--缺点:每一个接口对应一个定时任务

 

 

方法二:zk异步调用api

创建的是临时节点

可以直接传递json

主线程需要阻塞等待其他子服务的调用api

按create,exits,delete监听

--每种类型的回调方法只有一个,需要满足不同子服务的调用

 

原因

在调用一个接口的时候,可能会执行一些不必要的操作,有时这些操作等待响应的时间过长,如果同步调用接口,会使当前线程阻塞,大大影响了用户体验,所以利用zookeeper的watch机制,异步处理这些操作。

 

原理

1、Zookeeper提供了如下几种可以"注册watch"的操作:exist,getChildren,getData;而对于create,setData,delete是有可能触发"watcher"的操作

2、zookeeper的节点大小为1M

 

过程

1、创建子服务zookeeper,用于创建节点和对节点进行监听

2、启动服务的同时启动监听线程,创建定时任务读取zookeeper节点

3、其他子服务执行时调用zookeeper服务,创建一个持久时序节点

4、线程定时获取接口对应的子节点,根据子节点中的参数执行相应接口

5、读取数据之后删除节点

 

 

问题

1、其他子节点的session过期未被调用也会抛出异常?但是不影响自身节点的使用

org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /esque2

/* 2、创建根节点下的子节点时会抛根节点不为空的异常?但是不影响子节点的创建

org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /confTest */(因为调用了delete方法去删除根节点)

/*3、多个线程同时对一个子服务的节点进行修改, 会对其产生影响*/(创建持久顺序节点)

4、如果写操作创建了临时节点,释放之后读操作无法获取

5、接口回调方法的执行依赖于主线程的阻塞时间,如果在回调方法中调用其他子服务的接口,会执行了一半之后停止

 

 

 

https://blog.csdn.net/fayeyiwang/article/details/54743201 Zookeeper-Watcher机制与异步调用原理

https://blog.csdn.net/hohoo1990/article/details/78617336 zookeeper 中 Watcher 通知机制