Cris 的 ZooKeeper 学习笔记
文章目录
1、安装 ZooKeeper
Ⅰ. 解压
首先打开三台 Linux
,然后将 ZooKeeper
的压缩包上传到 Linux
指定目录下,并且解压到指定目录下即可
Ⅱ、配置环境变量(可选)
如果想要每次输入命令都不必给出 ZooKeeper
的命令所在目录,可以像 Java
或者 Hadoop
那样配置环境变量,这样子在任意目录都可以快捷使用 ZooKeeper
提供的命令
输入以下 Shell
命令
sudo vim /etc/profile
然后如下修改即可
ⅲ、修改配置文件以及同步
修改 ZooKeeper
的配置文件
快速使用之前写好的同步脚本命令为另外两台机子同步 ZooKeeper
的解压包和 /etc/profile
xsync /opt/module/ZooKeeper-3.4.10
# 这里需要使用sudo 权限同步 /opt/profile 文件
sudo /home/cris/bin/xsync /opt/profile
同步之后,需要测试单台 Linux 是否可以启动 ZooKeeper的服务
推荐使用 jps -l
指令查看运行在 jvm
上的程序
测试成功以后,我们就可以进行 ZooKeeper集群配置和群起/关 脚本的编写了
Ⅳ、ZooKeeper集群搭建以及集群脚本编写
配置 zoo.cfg
文件
配置 zkData
下的 myid
文件
同步文件到其他两台 Linux
机器,然后将其他两台的 myid
文件分别设置为 2(102) 和 3(103)
设置成功以后就可以编写群起/关脚本了
#!/bin/bash
for i in 101 102 103
do
# 关键是先要执行 source 同步环境变量,因为每次 ssh 登陆到远程主机的环境变量都不一样
ssh [email protected]$i 'source /etc/profile;/opt/module/ZooKeeper-3.4.10/bin/zkServer.sh start'
done
read -p "请耐心等待1秒钟" -t 1
echo
jpsall
然后是群关脚本
#!/bin/bash
for i in 101 102 103
do
ssh [email protected]$i 'source /etc/profile;/opt/module/ZooKeeper-3.4.10/bin/zkServer.sh stop'
done
read -p "请耐心等待1秒钟" -t 1
echo
jpsall
和群起脚本几乎一模一样~ ??
然后同步脚本到其他两台机子(可选)
在 101 号机子上使用群起脚本
查看 ZooKeeper
集群的主从状态
为什么是 102 成为了 leader?
- 半数机制(Paxos(帕克索斯) 协议):集群中半数以上机器存活,集群可用。所以ZooKeeper适合装在奇数台机器上。
- ZooKeeper虽然在配置文件中并没有指定master和slave。但是,ZooKeeper工作时,是有一个节点为leader,其他则为follower,Leader是通过内部的选举机制临时产生的
- 内部选举机制:以当前三台机子的集群为例,101 启动时只有当前一个节点,而总启动节点数量不过半(超过1.5),所以 101 节点为 follower,然后 102 启动,此时总启动节点数量为2,过半就将 102 设置为 leader,而 103 启动时,已经有 leader 节点了,所以 103 仍然为 follower
2、客户端操作 ZooKeeper
①、命令行操作
输入 zkCli.sh
即可进入当前 Linux
的 ZooKeeper
客户端(如果没出现 zk:2181 的框再回车即可)
注意:此时该 Linux 会启动一个 ZooKeeperMain
进程
具体的命令操作这里就不列出了,直接放上命令大全,使用的时候查询即可
命令基本语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls path [watch] | 使用 ls 命令来查看当前znode中所包含的内容 |
ls2 path [watch] | 查看当前节点数据并能看到更新次数等数据 |
create | 普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
get path [watch] | 获得节点的值 |
set | 设置节点的具体值 |
stat | 查看节点状态 |
delete | 删除节点 |
rmr | 递归删除节点 |
值得一提的时,当我们创建好了 Znode
节点后,ZooKeeper
可以自动同步到其他 Linux
的 ZooKeeper
服务器
Znode 有两种类型
短暂(ephemeral
):客户端和服务器端断开连接后,创建的节点自己删除
持久(persistent
):客户端和服务器端断开连接后,创建的节点不删除
Znode 有四种形式的目录节点,默认是 persistent
(1)持久化目录节点(PERSISTENT
)客户端与 ZooKeeper
断开连接后,该节点依旧存在
(2)持久化顺序编号目录节点(PERSISTENT_SEQUENTIAL
)客户端与 ZooKeeper
断开连接后,该节点依旧存在,只是 ZooKeeper
给该节点名称进行顺序编号
(3)临时目录节点(EPHEMERAL
)客户端与 ZooKeeper
断开连接后,该节点被删除
(4)临时顺序编号目录节点(EPHEMERAL_SEQUENTIAL
)客户端与 ZooKeeper
断开连接后,该节点被删除,只是 ZooKeeper
给该节点名称进行顺序编号
②、IDEA 客户端连接 ZooKeeper
操作客户端示例代码:
/**
* 客户端使用 zk api 来和远程 zk 服务器交互,重点是监听器原理
*
* @author zc-cris
* @version 1.0
**/
@SuppressWarnings("JavaDoc")
public class ZkTest {
/**
* ZooKeeper客户端端
**/
private ZooKeeperzooKeeper;
/**
* 连接超时时间单位为 ms
**/
private int timeOut;
/**
* 远程 ZooKeeper服务所在 Linux 服务器的 ip 和 port
**/
private String connectString;
public ZkTest() {
connectString = "192.168.1.101:2181,hadoop102:2181,hadoop103:2181";
timeOut = 2000;
}
/**
* 获取和远程 ZooKeeper服务器的连接
*
* @throws IOException
* @throws InterruptedException
*/
@Before
public void getConnection() throws IOException, InterruptedException {
// 创建 zk 的服务端对象,这里监听器使用了 lambda 表达式代替匿名内部类
ZooKeeper= new ZooKeeper(connectString, timeOut, watchedEvent -> {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(watchedEvent.getType() + "----------" + watchedEvent.getPath());
try {
// 记住:每次执行监听器的方法都还要注册监听事件
zooKeeper.getChildren("/", true);
zooKeeper.getData("/simida", true, null);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
});
}
/**
* 创建 zk 上的节点及其数据
*
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testCreate() throws KeeperException, InterruptedException {
// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
String s = zooKeeper.create("/cris02", "cris02".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("s = " + s);
}
/**
* 获取远程 zk 上的指定的路径列表
*
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testGetChildren() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/", false);
children.forEach(System.out::println);
}
/**
* 通过阻塞模拟客户端一直运行
*
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testGetChildren2() throws KeeperException, InterruptedException {
zooKeeper.getChildren("/", true);
// 延时阻塞
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 修改节点的内容
*
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testUpdate() throws KeeperException, InterruptedException {
zooKeeper.setData("/simida", "123".getBytes(), -1);
}
/**
* 获取指定节点是否存在
*
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testExist() throws KeeperException, InterruptedException {
Stat exists = zooKeeper.exists("/cris", false);
System.out.println(exists == null ? "exist" : "not exist");
}
/**
* 关闭客户端和远程 zk 服务器的连接
*
* @throws InterruptedException
*/
@After
public void closeConnection() throws InterruptedException {
zooKeeper.close();
}
}
以上代码模拟不同客户端和 ZooKeeper服务器之间的交互
首先执行 testGetChildren2
方法,因为堵塞的原因,该线程模拟的客户端生成的 ZooKeeper
对象会一直存在,监听器时刻准备着监听~
然后执行 testUpdate
或者 testCreate
模拟其他客户端修改 ZooKeeper服务器上的 node
节点内容或者创建新的 node
节点,观察 console
,ZooKeeper
服务器会发送消息到堵塞的线程,此时被监听器捕获到到并输出响应的内容,然后重新注册监听行为~
稍微有点绕~ ?
针对堵塞线程执行流程更加专业的说法如下:
1)首先要有一个main()线程
2)在main线程中创建ZooKeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给ZooKeeper。
4)在ZooKeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)ZooKeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6)listener线程内部调用了process()方法。
3. 监听服务器节点动态上下线案例
需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线
需求图
server 端代码
/**
* 模拟服务者,也叫生产者,为消费者提供服务的服务器
*
* @author zc-cris
* @version 1.0
**/
public class Server {
/**
* ZooKeeper客户端
**/
private ZooKeeperzooKeeper;
/**
* 连接超时时间单位为 ms
**/
private static int timeOut;
/**
* 远程运行 ZooKeeper服务的 Linux 服务器的 ip 和 port
**/
private static String connectString;
/**
* 服务者注册到 ZooKeeper的主 node 节点名称
**/
private static String parentServer;
static {
connectString = "192.168.1.101:2181,hadoop102:2181,hadoop103:2181";
timeOut = 2000;
parentServer = "/servers";
}
public static void main(String[] args) {
Server server = new Server();
// 1. 创建 ZooKeeper对象
server.getConnection();
// 2. 注册当前服务者到 ZooKeeper服务列表中
server.registerServer(args[0]);
// 3. 当前服务者跑的服务
server.business(args[0]);
}
/**
* 连接 ZooKeeper服务并获取 ZooKeeper对象,这里的 watcher 最好别为空,否则出现异常(但是不影响程序运行)
*/
private void getConnection() {
// 创建 ZooKeeper客户端对象
try {
ZooKeeper= new ZooKeeper(connectString, timeOut, watchedEvent -> {
});
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 注册自己到 ZooKeeper指定列表,注意:必须使用 CreateMode.EPHEMERAL_SEQUENTIAL 模式
* 否则会出现服务者重名而无法注册成功
*
* @param hostname 服务者的主机名
*/
private void registerServer(String hostname) {
try {
zooKeeper.create(parentServer + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println(hostname + " 已经注册到 ZooKeeper了");
}
/**
* 模拟服务者的业务
*
* @param hostname 服务者的主机名
*/
private void business(String hostname) {
System.out.println(hostname + "跑业务中....");
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Client 代码
/**
* 模拟消费者
*
* @author zc-cris
* @version 1.0
**/
public class Client {
/**
* ZooKeeper客户端端
**/
private ZooKeeperzooKeeper;
/**
* 连接超时时间单位为 ms
**/
private static int timeOut;
/**
* 远程 ZooKeeper服务所在 Linux 服务器的 ip 和 port
**/
private static String connectString;
/**
* 服务者注册到 ZooKeeper的主 node 节点名称
**/
private static String parentServer;
static {
connectString = "192.168.1.101:2181,hadoop102:2181,hadoop103:2181";
timeOut = 2000;
parentServer = "/servers";
}
/**
* 标志位,用于判断第一次连接 ZooKeeper不执行 getServers 的逻辑代码
**/
private boolean flag = false;
public static void main(String[] args) {
Client client = new Client();
// 1. 创建 ZooKeeper对象
client.getConnection();
// 2. 获取到 ZooKeeper服务上的所有服务者的注册列表
client.getServers();
// 3. 当前消费者跑的业务
client.business();
}
/**
* 获取 ZooKeeper上注册的所有服务者的列表并打印出来
* 每次服务者列表的变动,ZooKeeper会自动通知到该消费者,该消费者持有的 ZooKeeper对象里面的监听器就会自动调用
* getServers 方法
*/
private void getServers() {
if (flag) {
try {
List<String> children = zooKeeper.getChildren(parentServer, true);
// lambda 表达式对列表中的每一个元素进行打印操作
children.forEach(c ->
{
try {
System.out.println(new String(zooKeeper.getData(parentServer + "/" + c, false, null))
+ " 服务者运行着");
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
});
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
} else {
flag = true;
}
}
/**
* 连接 ZooKeeper服务并初始化 ZooKeeper对象,并且通过 lambda 表达式 创建监听器 Watcher 对象,重写 process 方法
*/
private void getConnection() {
try {
ZooKeeper= new ZooKeeper(connectString, timeOut, watchedEvent -> getServers());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 堵塞模拟一直运行的客户端
*/
private void business() {
System.out.println("客户端跑着服务");
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果如下
三台 Linux
运行 ZooKeeper
如下
输入 create /servers 'servers'
事先创建好服务者注册列表 node
然后启动 Server
程序三次
打开三台 Linux
任意一台的 ZooKeeper
客户端
可以发现服务者列表随着服务者程序的启动已经注册上去了
然后启动 Client
关掉 101 服务者,然后查看 Client 的console
再次查看 Linux
上的 ZooKeeper
节点情况
发现随着 101
服务者的下线,服务列表也跟着修改了
然后重新上线 101
服务者