ZooKeeper实现分布式队列Queue
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及java编程命令操作
目录
- 分布式队列
- 设计思路
-
程序实现
队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。
本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。
2. 设计思路创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是 x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。
产品流程图
应用实例
图标解释
- app1,app2,app3,app4是4个独立的业务系统
- zk1,zk2,zk3是ZooKeeper集群的3个连接点
- /queue,是znode的队列,假设队列长度为3
- /queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
- /queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
- /queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
- /queue/start,当znode队列中满了,触发创建开始节点
-
当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束
注:
- 1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
- 2). app1可以通过zk2提交,app2也可通过zk3提交
- 3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
- 4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!
详细代码实现:
-
import java.io.IOException;
-
-
import org.apache.zookeeper.CreateMode;
-
import org.apache.zookeeper.KeeperException;
-
import org.apache.zookeeper.WatchedEvent;
-
import org.apache.zookeeper.Watcher;
-
import org.apache.zookeeper.ZooKeeper;
-
import org.apache.zookeeper.ZooDefs.Ids;
-
-
public class QueueZooKeeper {
-
-
public static void main(String[] args) throws Exception {
-
if (args.length == 0) {
-
doOne();
-
} else {
-
doAction(Integer.parseInt(args[0]));
-
}
-
}
-
-
public static void doOne() throws Exception {
-
String host1 = "192.168.1.201:2181";
-
ZooKeeper zk = connection(host1);
-
initQueue(zk);
-
joinQueue(zk, 1);
-
joinQueue(zk, 2);
-
joinQueue(zk, 3);
-
zk.close();
-
}
-
-
public static void doAction(int client) throws Exception {
-
String host1 = "192.168.1.201:2181";
-
String host2 = "192.168.1.201:2182";
-
String host3 = "192.168.1.201:2183";
-
-
ZooKeeper zk = null;
-
switch (client) {
-
case 1:
-
zk = connection(host1);
-
initQueue(zk);
-
joinQueue(zk, 1);
-
break;
-
case 2:
-
zk = connection(host2);
-
initQueue(zk);
-
joinQueue(zk, 2);
-
break;
-
case 3:
-
zk = connection(host3);
-
initQueue(zk);
-
joinQueue(zk, 3);
-
break;
-
}
-
}
-
-
// 创建一个与服务器的连接
-
public static ZooKeeper connection(String host) throws IOException {
-
ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
-
// 监控所有被触发的事件
-
public void process(WatchedEvent event) {
-
if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
-
System.out.println("Queue has Completed.Finish testing!!!");
-
}
-
}
-
});
-
return zk;
-
}
-
-
public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
-
System.out.println("WATCH => /queue/start");
-
zk.exists("/queue/start", true);
-
-
if (zk.exists("/queue", false) == null) {
-
System.out.println("create /queue task-queue");
-
zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
} else {
-
System.out.println("/queue is exist!");
-
}
-
}
-
-
public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
-
System.out.println("create /queue/x" + x + " x" + x);
-
zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
isCompleted(zk);
-
}
-
-
public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
-
int size = 3;
-
int length = zk.getChildren("/queue", true).size();
-
-
System.out.println("Queue Complete:" + length + "/" + size);
-
if (length >= size) {
-
System.out.println("create /queue/start start");
-
zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-
}
-
}
-
- }