基于zookeeper 实现分布式读写锁( 思路 + 代码 )
基于zookeeper 实现分布式读写锁
分布式锁
如果不同的系统或一个系统的不同主机共享了一组资源,那么访问这些资源的时候,就需要通过一些互斥的手段来防止彼此之间的干扰,以保证一致性,在这种情况下,就需要使用分布式锁了。
读写锁
读写锁是同一时刻可以允许多个读操作访问,但是在写操作访问时,所有的读操作和其他写操作均被阻塞。Java 并发包提供的读写锁实现是ReentantReadWriteLock
。
分布式读写锁思路
下面我们介绍基于zookeeper 如何实现分布式读写锁:
如何定义锁节点:
通过zookeeper 上的数据节点来表示一个锁,比如:/lock/请求类型-顺序号。这个节点是临时节点,其后的序号由zookeeper 顺序生成,代表zookeeper 节点的全局唯一顺序,例如/lock/READ-00000001 就表示一个类型为读的分布式锁。zookeeper 节点结构如下:
如何获取锁:
在获取锁时,客户端到/lock 节点下创建一个临时顺序节点,如果是读,就创建一个形如/lock/READ- 的临时顺序节点,然后zookeeper 会返回一个完整的节点,形如/lock/READ-00000002 的带有顺序号的临时节点,这个节点就代表着你将要获取的锁。
判断是否得到锁:
如果是写锁:
- 获取/lock 下的子节点
- 按照顺序号排序
- 检查此写锁之前是否还有其他锁,若有先注册对该写锁前一个锁的监听,然后阻塞该写锁获取,若监听到该写锁前一个锁已释放,则该写锁打开阻塞。
如果是读锁:
- 获取/lock 下的子节点
- 按照顺序号排序
- 检查此读锁之前是否有写锁,若有先注册对该读锁的前一个写锁的监听,然后阻塞该读锁的获取。若监听到该读锁前一个写锁已释放,则该读锁打开阻塞。
分布式读写锁代码实现
public final class DLock
{
private ZkClient zkClient;
private String lockName;
private String thisReadLock;
private String thisWriteLock;
/**
* 分布式锁连接zookeeper 以及 初始化锁主节点
*
* @param hostUrl zookeeper 连接url
* @param lockName 锁主节点
*/
public void connect(String hostUrl , String lockName)
{
this.lockName = lockName;
zkClient = new ZkClient(hostUrl);
if (!zkClient.exists(lockName))
zkClient.createPersistent(lockName);
}
/**
* 获取读锁
*/
public void lockRead()
{
CountDownLatch readLatch = new CountDownLatch(1);
// 创建此临时节点, 获取带有顺序号的完整节点
String thisLockNodeBuilder = lockName +
"/" +
LockType.READ +
"-";
thisReadLock = zkClient.createEphemeralSequential(thisLockNodeBuilder , "");
// 找到此读锁前一个写锁
List<String> tmp_nodes = zkClient.getChildren(lockName);
sortNodes(tmp_nodes);
tmp_nodes.forEach(System.out::println);
int tmp_index = 0;
for (int i = tmp_nodes.size() - 1; i >= 0; i--)
{
if (thisReadLock.equals(lockName + "/" + tmp_nodes.get(i)))
{
tmp_index = i;
} else if (i < tmp_index && tmp_nodes.get(i).split("-")[0].equals(LockType.WRITE.toString()))
{
// 找到当前读锁之前的一个写锁
// 先监听此写锁,再阻塞当前读锁
zkClient.subscribeChildChanges(lockName + "/" + tmp_nodes.get(i) , (parentPath , currentChilds) -> readLatch.countDown());
try
{
readLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
break;
}
}
}
/**
* 释放读锁
*/
public void unLockRead()
{
if (this.thisReadLock != null)
{
zkClient.delete(thisReadLock);
thisReadLock = null;
}
}
/**
* 获取写锁
*/
public void lockWrite()
{
CountDownLatch writeLatch = new CountDownLatch(1);
// 创建此临时节点, 获取带有顺序号的完整节点
String thisLockNodeBuilder = lockName +
"/" +
LockType.WRITE +
"-";
thisWriteLock = zkClient.createEphemeralSequential(thisLockNodeBuilder , "");
List<String> tmp_nodes = zkClient.getChildren(lockName);
sortNodes(tmp_nodes);
for (int i = tmp_nodes.size() - 1; i >= 0; i--)
{
if (thisWriteLock.equals(lockName + "/" + tmp_nodes.get(i)))
{
// 在锁列表中找到此写锁
if (i > 0)
{
// 如果此写锁前面还有锁
// 监听前面的锁, 然后阻塞当前写锁获取
zkClient.subscribeChildChanges(lockName + "/" + tmp_nodes.get(i - 1) , (parentPath , currentChilds) -> writeLatch.countDown());
try
{
// 阻塞当前写锁获取
writeLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
break;
}
}
}
}
/**
* 释放写锁
*/
public void unLockWrite()
{
if (thisWriteLock != null)
{
zkClient.delete(thisWriteLock);
thisWriteLock = null;
}
}
/**
* 节点按照顺序号排序
*
* @param nodes 临时节点
*/
private void sortNodes(List<String> nodes)
{
nodes.sort(Comparator.comparing(o -> o.split("-")[1]));
}
/**
* 锁类型枚举
*/
private enum LockType
{
READ,
WRITE;
}
}
测试
测试思路:依次开启四个客户端,分别获取读锁(10s 后释放)、读锁(立即释放)、写锁(10s 后释放)、读锁(立即释放)。
客户端代码:
// 客户端1
DLock lock=new DLock();
lock.connect("123.56.24.247:2181", "/lock");
lock.lockRead();
System.out.println("I am testNode 1");
try
{
System.out.println("I am TestNode 1");
System.out.println("睡眠10s 之后释放分布式读锁, 开始倒计时");
for (int i = 0; i < 10; i++)
{
System.out.println(10-i);
Thread.sleep(1000);
}
} catch (InterruptedException e)
{
e.printStackTrace();
}
lock.unLockRead();
// 客户端2
DLock lock=new DLock();
lock.connect("123.56.24.247:2181", "/lock");
lock.lockRead();
System.out.println("I am TestNode 2");
lock.unLockRead();
// 客户端3
DLock lock=new DLock();
lock.connect("123.56.24.247:2181", "/lock");
lock.lockWrite();
System.out.println("I am testNode 3");
System.out.println("睡眠10s 之后释放分布式写锁, 开始倒计时");
for (int i = 0; i < 10; i++)
{
System.out.println(10-i);
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
lock.unLockWrite();
// 客户端4
DLock lock=new DLock();
lock.connect("123.56.24.247:2181", "/lock");
lock.lockRead();
System.out.println("I am TestNode 4");
lock.unLockRead();
测试结果:
- 第一个客户端获取到读锁,输出:I am TestNode 1,然后10s 倒计时释放准备释放读锁。
- 第二个客户端立马获取到读锁,输出 I am TestNode 2
- 第三个客户端等待10s 后才获取到写锁,输出 I am TestNode 3
- 第四个客户端等待第三个客户端的写锁释放后才获取到读锁,输出:I am TestNode 4