Zookeeper简单介绍及实现一个最简单的分布式锁
目录
前言
首先申明,本文是基于zkclient客户端实现的分布式锁(即选主),利用的是Zookeeper的临时节点在宕机后被删除这一特性实现(简单互斥锁),所以整体实现很简单会带来可能存在的Zookeeper性能问题
一般来说分布式系统中并不会采用该方式来实现分布式锁,该方法只适用于节点较少的项目,否则由于简单互斥锁的特性,每次抢锁都会造成大量的进程去竞争,并且当主锁挂掉时,zk会发通知给其余所有进程:告诉他们是时候该抢锁了,当集群数过大时,抢锁和通知过程则产生zk中的羊群效应。总而言之:两三个节点的项目可以使用该方法,多集群项目实现分布式锁一般采用的是Zookeeper的互斥锁即有序节点实现,后续更新
1、Zookeeper是什么?
在给出具体实现时,还是先记录下关于该简单分布式锁学习的大致过程,Zookeeper是一个开源的分布式协调服务,分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。下图为Zookeeper整体架构图:
Zookeeper集群由一组Server节点组成,其中只会存在一个主节点即Leader,另外的均为从节点即Follower,每个Server节点均可以独立实现读请求,当某个Server接收到写请求时,会先将该请求转发到Leader节点,由Leader节点执行然后同步更新至其余从节点。
2、Zookeeper的数据模型
Zookeeper将数据保存在内存中,这意味着其可以实现高吞吐量和低延迟,其数据模型如上图所示,Zookeeper的数据模型和UNIX的文件系统非常类似,整体可以看成是一棵树,每个节点称为一个ZNode,每个ZNode都可以通过其路径唯一表示,例如上图中第三层第一个,可以表示为/app1/c1。在每个ZNode还可以存储少量的数据(默认是1m,可以通过配置修改,通常不建议在ZNode上存储大量的数据),这个特性非常有用。另外,每个ZNode上还存储了其ACL(Access Control Lists)信息,这里需要注意,虽说ZNode的树形结构跟Unix文件系统很类似,但是其ACL与Unix文件系统是完全不同的,每个ZNode的ACL都是独立的,子结点不会继承父结点。
3、Zookeeper基本概念
ZNode:
在谈到分布式的时候,我们通常所说的”节点“是指组成集群的每一台机器,然鹅,在Zookeeper中,”节点“分为两类:
- 第一类同样是指集群中的每台机器,我们称之为机器节点
- 第二类则是指组成数据模型中的数据单元,我们称之为数据节点-ZNode
ZNode的大致介绍在数据模型中已经阐述,ZNode根据其本身的特性,又可以分为以下两类:
- Regular ZNode:常规型ZNode,又称持久节点,持久即一旦创建除非用户显式的删除否则将一直存在
- Ephemeral ZNode:临时型ZNode,用户创建它之后,可以显示的删除,也可以等待创建它的Session结束后,由Zookeeper Server自动删除
ZNode还有一个Sequential的特性,如果创建的时候指定的话,该ZNode的名字后面会自动Append一个不断增加的SequenceNo,该SequenceNo由其父节点进行维护。
Session(会话):
Session 指的是 Zookeeper与客户端的会话,每个Session都有一个sessionTimeout即超时时间。因为Zookeeper集群会把客户端的Session信息持久化,所以在Session没超时之前,客户端与Zookeeper Server的连接可以在各个Zookeeper Server之间透明地移动。
在实际的应用中,如果Client与Server之间的通信足够频繁,Session的维护就不需要其它额外的消息了。否则,Zookeeper Client会每t/3 ms发一次心跳给Server,如果Client 2t/3 ms没收到来自Server的心跳回应,就会换到一个新的Zookeeper Server上。这里的t指用户配置的Session超时时间。
Watcher(事件监听器):
Zookeeper支持一种Watch操作,Client可以在某个ZNode上设置一个Watcher,来Watch该ZNode上的变化。如果该ZNode上有相应的变化,就会触发这个Watcher,把相应的事件通知给设置Watcher的Client。需要注意的是,Zookeeper中的Watcher是一次性的,即触发一次就会被取消,如果想继续Watch的话,需要客户端重新设置Watcher。这个跟epoll里的oneshot模式有点类似。
ACL:
Zookeeper采用 ACL(Access Control Lists)策略来进行权限控制,类似于 UNIX 文件系统的权限控制。Zookeeper 定义了 5 种权限,如下图:
其中尤其需要注意的是,CREATE 和 DELETE 这两种权限都是针对子节点的权限控制。
4、Zookeeper特性
这一块特性就不多列举了,网上也有很多,这里只记录两个与实现分布式锁有关的特性:
1、简单互斥锁(Simple Lock)
我们知道,在传统的应用程序中,线程、进程的同步,都可以通过操作系统提供的机制来完成。但是在分布式系统中,多个进程之间的同步,操作系统层面就无能为力了。这时候就需要像Zookeeper这样的分布式协调服务来协助完成同步,下面是用Zookeeper实现简单的互斥锁的步骤,也是本文实现的理论基础:
- 多个进程尝试去在指定的目录下去创建一个临时性(Ephemeral)结点 /locks/my_lock
- Zookeeper能保证,只会有一个进程成功创建该结点,创建结点成功的进程就是抢到锁的进程,假设该进程为A
- 其它进程都对/locks/my_lock进行Watch
- 当A进程不再需要锁,可以显式删除/locks/my_lock释放锁;或者是A进程宕机后Session超时,Zookeeper系统自动删除/locks/my_lock结点释放锁。此时,其它进程就会收到Zookeeper的通知,并尝试去创建/locks/my_lock抢锁,如此循环反复
2、互斥锁(Simple Lock without Herd Effect)
在简单互斥锁的实现过程中,由于抢锁和通知的过程容易造成羊群效应,因此采用如下步骤来进行改进:
- 每个进程都在Zookeeper上创建一个临时的顺序结点(Ephemeral Sequential) /locks/lock_${seq}
- ${seq}最小的为当前的持锁者(${seq}是Zookeeper生成的Sequenctial Number)
- 其它进程都只watch比它次小的进程对应的结点,比如2 watch 1, 3 watch 2, 以此类推
- 当前持锁者释放锁后,比它次大的进程就会收到Zookeeper的通知,它成为新的持锁者,如此循环反复
需要注意的是,通常在分布式系统中用Zookeeper来做Leader Election(选主)即分布式锁,就是通过上面的机制来实现的,这里的持锁者就是当前的“主”。
5、Zookeeper常用的功能
数据发布与订阅(配置中心) |
发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。 |
注意:在上面提到的应用场景中,有个默认前提是:数据量很小,但是数据更新可能会比较快的场景。 |
负载均衡 |
这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。 |
消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下: 生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。
消费负载均衡: 在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:
在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。 |
命名服务(Naming Service) |
命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。 |
阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表,点击这里查看Dubbo开源项目。在Dubbo实现中:
服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。 服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。 注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。 |
分布式通知/协调 |
ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理 |
总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合 |
集群管理与Master选举 |
利用ZooKeeper有两个特性,就可以实时另一种集群机器存活性监控系统:
例如,监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。
在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。 利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。 另外,这种场景演化一下,就是动态Master选举。这就要用到?EPHEMERAL_SEQUENTIAL类型节点的特性了。 上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,?/currentMaster/{sessionId}-2 ,?/currentMaster/{sessionId}-3 ….. 每次选取***最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。 |
|
分布式锁 |
分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性。锁服务可以分为两类,一个是保持独占,另一个是控制时序。
|
分布式队列 |
队列方面,简单地讲有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。
第二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。 |
6、基于简单互斥锁实现分布式锁代码实现
首先分布式锁的概念我们应该都很清楚,即在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行,确保不会因为多进程同时操作而产生数据问题。该实现整体是基于zkclient客户端实现(更好的实现方案应该是基于Curator客户端实现,其提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁)详细注释也均在代码中,先给zkclient客户端pom依赖(若依赖产生冲突,排除zkclient中的部分依赖即可)
<!-- zkclient客户端 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
选主工具代码如下:
package service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
/**
* 分布式锁的使用场景:分布式系统下某些写操作(类似于初始化任务、mq监听任务、定时任务等等)
* 基于zkclient实现分布式锁(即选主),利用zookeeper的简单互斥锁特性
* 该方法只适用于节点较少,否则由于简单互斥锁的特性,每次抢锁都会造成大量的进程去竞争,并且当主锁挂掉时,zk会发通知给其余所有进程:告诉他们是时候来抢锁啦,这里的抢锁和通知过程即zk中的羊群效应
* 所以在一般分布式系统中,如果使用zk来实现分布式锁都是采用互斥锁的特性,即有序节点的方式,而不是该简单方法,本文不作讨论
* Created by xujia on 2019/4/16
*/
@Slf4j
public class Leader {
/**
* 分布式系统保证一个主节点,外层业务可根据该值判断
*/
private volatile boolean isLeader = false;
/**
* zk客户端
*/
private ZkClient zkClient;
/**
* 临时节点的路径,不要与其他业务逻辑重复
*/
private String path;
/**
* 节点信息
*/
private String hostInfo;
/**
* 根节点
*/
private static final String ROOT_CATALOG = "/xj";
/**
* 初始化根节点
*/
public void initRootCatlog() {
if (zkClient.exists(ROOT_CATALOG))
return;
try {
// 创建持久化节点
zkClient.createPersistent(ROOT_CATALOG);
} catch (Exception e) {
log.warn("create root catalog error :", e);
}
}
public Leader (String zkServers, String tempPath, String host) {
// 第一个参数zkServers:zk地址,集群以逗号隔开 第二个参数sessionTimeout:会话超时时长 第三个参数connectionTimeout:连接超时时长
zkClient = new ZkClient(new ZkConnection(zkServers, 10000), 10000);
this.path = ROOT_CATALOG + tempPath;
try {
// 获取当前主机信息
hostInfo = InetAddress.getLocalHost().toString();
} catch (UnknownHostException e) {
log.warn("get local host info error :", e);
}
// 为了测试,写死
hostInfo = host;
// 初始化根节点
initRootCatlog();
// 抢锁
tryLeader();
// 监听指定节点数据变化
zkClient.subscribeDataChanges(path, new IZkDataListener() {
/**
* 节点数据改变时触发,该接口只会对所监控的路径的数据变化,子节点数据发送变化不会被监控到,例如s为/xj,若/xj/chile路径上的数据发生变化了则不会被监控到
* @param path 节点路径
* @param data 节点数据
* @throws Exception
*/
public void handleDataChange(String path, Object data) throws Exception {
// 数据改变 无需重新获取主锁
}
/**
* 节点删除时触发
* @param path 节点路径
* @throws Exception
*/
public void handleDataDeleted(String path) throws Exception {
// 主节点挂了,其余阻塞节点进入该方法尝试获取锁
if (log.isInfoEnabled())
log.info(hostInfo + "start to get lock on :" + path);
tryLeader();
}
});
}
/**
* 获取主锁,即选举leader
*/
public void tryLeader() {
if (!zkClient.exists(path)) {
try {
// 创建临时节点,当同一时刻多个进程同时创建时,Zookeeper能保证只有一个进程会创建成功,此时则为持有主锁
zkClient.createEphemeral(path, hostInfo);
isLeader = true;
if (log.isInfoEnabled())
log.info(hostInfo + "become leader success on :" + path);
} catch (Exception e) {
isLeader = false;
if (log.isInfoEnabled())
log.info(hostInfo + "become leader failed on :" + path);
}
}
if (log.isInfoEnabled())
log.info("temporary point has built,wait for next leader elect :" + path);
}
/**
* 提供给外部调用,获取主从状态
* @return
*/
public boolean isLeader() {
return isLeader;
}
/**
* 显式关闭zkclient
*/
public void close() {
try {
zkClient.unsubscribeAll();
zkClient.close();
} catch (Exception e) {
log.warn("Close zkClient error:", e);
}
isLeader = false;
}
public static void main(String[] args) {
// 模拟三个客户端
for (int j = 0 ; j < 3 ; j++) {
Thread thread = new Thread(new LeaderThread("xxxxx", "/test","host" + j));
thread.start();
}
}
}
/**
* 模拟多个进程同时争抢锁
*/
@Slf4j
class LeaderThread implements Runnable {
private String zkServers;
private String path;
private String host;
public LeaderThread(String zkServers, String path, String host) {
this.host = host;
this.zkServers = zkServers;
this.path = path;
}
@Override
public void run() {
Leader zk = new Leader(zkServers, path, host);
int i = 0;
// 防止线程退出
while (true) {
try {
Thread.sleep(10000);
i++;
// 每2s模拟一次宕机
if (i % 2 == 0) {
if (zk.isLeader()) {
zk.close();
log.info(host + "closed");
}
}
log.info("init finish current " + host +" status " + zk.isLeader());
} catch (InterruptedException e) {
}
}
}
}
来看下部分输出结果(更多信息可自行观察):
// 如下是当某个进程挂掉之后,Zookeeper发送通知给其余两个进程并触发监听事件
11:45:18.420 [Thread-1-SendThread(10.1.61.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification sessionid:0x16a2353e4ae1e38
11:45:18.420 [Thread-2-SendThread(10.1.61.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification sessionid:0x16a2353e4ae1e39
11:45:18.420 [Thread-1-SendThread(10.1.61.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeDeleted path:/xj/test for sessionid 0x16a2353e4ae1e38
11:45:18.420 [Thread-2-SendThread(10.1.61.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeDeleted path:/xj/test for sessionid 0x16a2353e4ae1e39