zookeeper-watcher异步调用子服务
zookeeper=文件系统+通知机制
一、 文件系统
Zookeeper维护一个类似文件系统的数据结构:
每个子目录项如 NameService 都被称作为 znode,和文件系统一样,我们能够*的增加、删除znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的。
有四种类型的znode:
1)PERSISTENT-持久目录节点
客户端与zookeeper断开连接后,该节点依旧存在
2) PERSISTENT_SEQUENTIAL-持久顺序节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
3)EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
4)EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
二、 通知机制
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。
我们可以自定义Watcher,如果是Boolean型变量,当为true时,则使用系统默认的Watcher,系统默认的Watcher是在Zookeeper的构造函数中定义的Watcher。参数中Watcher为空或者false,表示不启用Wather。
1、一次性触发器
客户端在Znode设置了Watch时,如果Znode内容发生改变,那么客户端就会获得Watch事件。例如:客户端设置getData("/znode1", true)后,如果/znode1发生改变或者删除,那么客户端就会得到一个/znode1的Watch事件,但是/znode1再次发生变化,那客户端是无法收到Watch事件的,除非客户端设置了新的Watch。
2、发送至客户端
Watch事件是异步发送到Client。Zookeeper可以保证客户端发送过去的更新顺序是有序的。例如:某个Znode没有设置watcher,那么客户端对这个Znode设置Watcher发送到集群之前,该客户端是感知不到该Znode任何的改变情况的。换个角度来解释:由于Watch有一次性触发的特点,所以在服务器端没有Watcher的情况下,Znode的任何变更就不会通知到客户端。不过,即使某个Znode设置了Watcher,且在Znode有变化的情况下通知到了客户端,但是在客户端接收到这个变化事件,但是还没有再次设置Watcher之前,如果其他客户端对该Znode做了修改,这种情况下,Znode第二次的变化客户端是无法收到通知的。这可能是由于网络延迟或者是其他因素导致,所以我们使用Zookeeper不能期望能够监控到节点每次的变化。Zookeeper只能保证最终的一致性,而无法保证强一致性。
3、设置watch的数据内容
Znode改变有很多种方式,例如:节点创建,节点删除,节点改变,子节点改变等等。Zookeeper维护了两个Watch列表,一个节点数据Watch列表,另一个是子节点Watch列表。getData()和exists()设置数据Watch,getChildren()设置子节点Watch。两者选其一,可以让我们根据不同的返回结果选择不同的Watch方式,getData()和exists()返回节点的内容,getChildren()返回子节点列表。因此,setData()触发内容Watch,create()触发当前节点的内容Watch或者是其父节点的子节点Watch。delete()同时触发父节点的子节点Watch和内容Watch,以及子节点的内容Watch。
Zookeeper Watcher的运行机制
1,Watch是轻量级的,其实就是本地JVM的Callback,服务器端只是存了是否有设置了Watcher的布尔类型。(源码见:org.apache.zookeeper.server.FinalRequestProcessor)
2,在服务端,在FinalRequestProcessor处理对应的Znode操作时,会根据客户端传递的watcher变量,添加到对应的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中进行持久化存储,同时将自己NIOServerCnxn做为一个Watcher callback,监听服务端事件变化
3,Leader通过投票通过了某次Znode变化的请求后,然后通知对应的Follower,Follower根据自己内存中的zkDataBase信息,发送notification信息给zookeeper客户端。
4,Zookeeper客户端接收到notification信息后,找到对应变化path的watcher列表,挨个进行触发回调
watcher分类:
defaultWatcher
existWatcher-->exist()
childWatcher-->getChildren()
dataWatcher-->getData()
watcher原理:
1、创建zookeeper实例的时候接收一个watcher参数,赋值给watchMnanger.defaultWatcher,此watcher与其它watcher不同,主要用于响应与链接状态转换有关的事件(建立链接,关闭链接等)
2、ZKWatchManager是客户端watcher管理器,负责跟踪多种watcher,每种类型的watcher将会被存在各自的Map中,key为path,value为Set<Watcher>。watcher由ZKWatchManager负责管理,并不会随请求发送给server,而只会发给server此请求类型是否注册了watch
3、在ZKDatabase中,包括一个DataTree,此dataTree持有对nodes以及相关的watcher的数据,server端,WatcherManager是管理client注册的watcher,其数据结构为HashSet<path,Set<Watcher>>
4、请求到达server之后,在FinalRequestProcessor中,将会处理各种请求,如果检测到request.getWatch()为true,即请求要求注册watch,那么将会把ServerCnxn和path关联起来,加入到WatherManager相应的列表中.
5、客户端的请求响应之后,由SendThread.readResponse()处理响应,如果响应code为成功且此请求中注册了watch,那么将会把此wath添加到响应的watch列表中。
6、DataTree持有2个WatchManager对象,分别为dataWatches用于管理注册data操作的watch,childWatches用于管理注册child操作的watch。
Zookeeper的Watcher机制主要包括客户端线程、客户端WatchManager和Zookeeper服务器三部分。在具体的流程上,客户端向Zookeeper服务器注册Watcher事件监听的同时,会将Watcher对象存储在 客户端WatchManager中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象执行回调逻辑(process方法)。
流程:
1、不同子服务在zk上注册同一个znode节点
2、第一个子服务在节点上创建数据
3、第二个子服务监听节点的变化
4、当节点改变时做出对应的操作
方法一:watche机制回调
创建的是持久节点
需要自己将传递的参数由json转化为String
子服务定时调用getData方法,判断是否执行api
一个外部调用对应一个监听
--缺点:每一个接口对应一个定时任务
方法二:zk异步调用api
创建的是临时节点
可以直接传递json
主线程需要阻塞等待其他子服务的调用api
按create,exits,delete监听
--每种类型的回调方法只有一个,需要满足不同子服务的调用
原因
在调用一个接口的时候,可能会执行一些不必要的操作,有时这些操作等待响应的时间过长,如果同步调用接口,会使当前线程阻塞,大大影响了用户体验,所以利用zookeeper的watch机制,异步处理这些操作。
原理
1、Zookeeper提供了如下几种可以"注册watch"的操作:exist,getChildren,getData;而对于create,setData,delete是有可能触发"watcher"的操作
2、zookeeper的节点大小为1M
过程
1、创建子服务zookeeper,用于创建节点和对节点进行监听
2、启动服务的同时启动监听线程,创建定时任务读取zookeeper节点
3、其他子服务执行时调用zookeeper服务,创建一个持久时序节点
4、线程定时获取接口对应的子节点,根据子节点中的参数执行相应接口
5、读取数据之后删除节点
问题
1、其他子节点的session过期未被调用也会抛出异常?但是不影响自身节点的使用
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /esque2
/* 2、创建根节点下的子节点时会抛根节点不为空的异常?但是不影响子节点的创建
org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /confTest */(因为调用了delete方法去删除根节点)
/*3、多个线程同时对一个子服务的节点进行修改, 会对其产生影响*/(创建持久顺序节点)
4、如果写操作创建了临时节点,释放之后读操作无法获取
5、接口回调方法的执行依赖于主线程的阻塞时间,如果在回调方法中调用其他子服务的接口,会执行了一半之后停止
https://blog.csdn.net/fayeyiwang/article/details/54743201 Zookeeper-Watcher机制与异步调用原理
https://blog.csdn.net/hohoo1990/article/details/78617336 zookeeper 中 Watcher 通知机制