Cris 的 ZooKeeper 学习笔记

1、安装 ZooKeeper

Ⅰ. 解压

首先打开三台 Linux ,然后将 ZooKeeper 的压缩包上传到 Linux 指定目录下,并且解压到指定目录下即可

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

Ⅱ、配置环境变量(可选)

如果想要每次输入命令都不必给出 ZooKeeper 的命令所在目录,可以像 Java 或者 Hadoop 那样配置环境变量,这样子在任意目录都可以快捷使用 ZooKeeper 提供的命令

输入以下 Shell 命令

sudo vim /etc/profile

然后如下修改即可

Cris 的 ZooKeeper 学习笔记

ⅲ、修改配置文件以及同步

修改 ZooKeeper 的配置文件

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

快速使用之前写好的同步脚本命令为另外两台机子同步 ZooKeeper 的解压包和 /etc/profile

xsync /opt/module/ZooKeeper-3.4.10
# 这里需要使用sudo 权限同步 /opt/profile 文件
sudo /home/cris/bin/xsync /opt/profile	

同步之后,需要测试单台 Linux 是否可以启动 ZooKeeper的服务

Cris 的 ZooKeeper 学习笔记

推荐使用 jps -l 指令查看运行在 jvm 上的程序

测试成功以后,我们就可以进行 ZooKeeper集群配置和群起/关 脚本的编写了

Ⅳ、ZooKeeper集群搭建以及集群脚本编写

配置 zoo.cfg 文件

Cris 的 ZooKeeper 学习笔记

配置 zkData 下的 myid 文件

Cris 的 ZooKeeper 学习笔记

同步文件到其他两台 Linux 机器,然后将其他两台的 myid 文件分别设置为 2(102) 和 3(103)

设置成功以后就可以编写群起/关脚本了

#!/bin/bash

for i in 101 102 103
do
# 关键是先要执行 source 同步环境变量,因为每次 ssh 登陆到远程主机的环境变量都不一样
ssh [email protected]$i 'source /etc/profile;/opt/module/ZooKeeper-3.4.10/bin/zkServer.sh start'
done
read -p "请耐心等待1秒钟" -t 1
echo
jpsall

然后是群关脚本

#!/bin/bash

for i in 101 102 103
do
ssh [email protected]$i 'source /etc/profile;/opt/module/ZooKeeper-3.4.10/bin/zkServer.sh stop'
done
read -p "请耐心等待1秒钟" -t 1
echo
jpsall

和群起脚本几乎一模一样~ ??

然后同步脚本到其他两台机子(可选)

在 101 号机子上使用群起脚本

Cris 的 ZooKeeper 学习笔记

查看 ZooKeeper 集群的主从状态

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

为什么是 102 成为了 leader?

  1. 半数机制(Paxos(帕克索斯) 协议):集群中半数以上机器存活,集群可用。所以ZooKeeper适合装在奇数台机器上。
  2. ZooKeeper虽然在配置文件中并没有指定master和slave。但是,ZooKeeper工作时,是有一个节点为leader,其他则为follower,Leader是通过内部的选举机制临时产生的
  3. 内部选举机制:以当前三台机子的集群为例,101 启动时只有当前一个节点,而总启动节点数量不过半(超过1.5),所以 101 节点为 follower,然后 102 启动,此时总启动节点数量为2,过半就将 102 设置为 leader,而 103 启动时,已经有 leader 节点了,所以 103 仍然为 follower

2、客户端操作 ZooKeeper

①、命令行操作

输入 zkCli.sh 即可进入当前 LinuxZooKeeper 客户端(如果没出现 zk:2181 的框再回车即可)

注意:此时该 Linux 会启动一个 ZooKeeperMain 进程

Cris 的 ZooKeeper 学习笔记

具体的命令操作这里就不列出了,直接放上命令大全,使用的时候查询即可

命令基本语法 功能描述
help 显示所有操作命令
ls path [watch] 使用 ls 命令来查看当前znode中所包含的内容
ls2 path [watch] 查看当前节点数据并能看到更新次数等数据
create 普通创建 -s 含有序列 -e 临时(重启或者超时消失)
get path [watch] 获得节点的值
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
rmr 递归删除节点

值得一提的时,当我们创建好了 Znode 节点后,ZooKeeper 可以自动同步到其他 LinuxZooKeeper 服务器

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

Znode 有两种类型

短暂(ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
持久(persistent):客户端和服务器端断开连接后,创建的节点不删除

Znode 有四种形式的目录节点,默认是 persistent

(1)持久化目录节点(PERSISTENT)客户端与 ZooKeeper 断开连接后,该节点依旧存在

(2)持久化顺序编号目录节点(PERSISTENT_SEQUENTIAL)客户端与 ZooKeeper 断开连接后,该节点依旧存在,只是 ZooKeeper 给该节点名称进行顺序编号

(3)临时目录节点(EPHEMERAL)客户端与 ZooKeeper 断开连接后,该节点被删除

(4)临时顺序编号目录节点(EPHEMERAL_SEQUENTIAL)客户端与 ZooKeeper 断开连接后,该节点被删除,只是 ZooKeeper 给该节点名称进行顺序编号

②、IDEA 客户端连接 ZooKeeper

操作客户端示例代码:

/**
 * 客户端使用 zk api 来和远程 zk 服务器交互,重点是监听器原理
 *
 * @author zc-cris
 * @version 1.0
 **/
@SuppressWarnings("JavaDoc")
public class ZkTest {

    /**
     * ZooKeeper客户端端
     **/
    private ZooKeeperzooKeeper;
    /**
     * 连接超时时间单位为 ms
     **/
    private int timeOut;
    /**
     * 远程 ZooKeeper服务所在 Linux 服务器的 ip 和 port
     **/
    private String connectString;

    public ZkTest() {
        connectString = "192.168.1.101:2181,hadoop102:2181,hadoop103:2181";
        timeOut = 2000;
    }

    /**
     * 获取和远程 ZooKeeper服务器的连接
     *
     * @throws IOException
     * @throws InterruptedException
     */
    @Before
    public void getConnection() throws IOException, InterruptedException {
        // 创建 zk 的服务端对象,这里监听器使用了 lambda 表达式代替匿名内部类
        ZooKeeper= new ZooKeeper(connectString, timeOut, watchedEvent -> {
            // 收到事件通知后的回调函数(用户的业务逻辑)
            System.out.println(watchedEvent.getType() + "----------" + watchedEvent.getPath());
            try {
                // 记住:每次执行监听器的方法都还要注册监听事件
                zooKeeper.getChildren("/", true);
                zooKeeper.getData("/simida", true, null);
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 创建 zk 上的节点及其数据
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testCreate() throws KeeperException, InterruptedException {
        // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
        String s = zooKeeper.create("/cris02", "cris02".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("s = " + s);
    }

    /**
     * 获取远程 zk 上的指定的路径列表
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testGetChildren() throws KeeperException, InterruptedException {
        List<String> children = zooKeeper.getChildren("/", false);
        children.forEach(System.out::println);
    }

    /**
     * 通过阻塞模拟客户端一直运行
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testGetChildren2() throws KeeperException, InterruptedException {
        zooKeeper.getChildren("/", true);
        // 延时阻塞
        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 修改节点的内容
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testUpdate() throws KeeperException, InterruptedException {
        zooKeeper.setData("/simida", "123".getBytes(), -1);
    }

    /**
     * 获取指定节点是否存在
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testExist() throws KeeperException, InterruptedException {
        Stat exists = zooKeeper.exists("/cris", false);
        System.out.println(exists == null ? "exist" : "not exist");
    }

    /**
     * 关闭客户端和远程 zk 服务器的连接
     *
     * @throws InterruptedException
     */
    @After
    public void closeConnection() throws InterruptedException {
        zooKeeper.close();
    }
}

以上代码模拟不同客户端和 ZooKeeper服务器之间的交互

​ 首先执行 testGetChildren2 方法,因为堵塞的原因,该线程模拟的客户端生成的 ZooKeeper 对象会一直存在,监听器时刻准备着监听~

​ 然后执行 testUpdate 或者 testCreate 模拟其他客户端修改 ZooKeeper服务器上的 node 节点内容或者创建新的 node 节点,观察 consoleZooKeeper 服务器会发送消息到堵塞的线程,此时被监听器捕获到到并输出响应的内容,然后重新注册监听行为~

​ 稍微有点绕~ ?

​ 针对堵塞线程执行流程更加专业的说法如下:

1)首先要有一个main()线程
2)在main线程中创建ZooKeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给ZooKeeper。
4)在ZooKeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)ZooKeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6)listener线程内部调用了process()方法。

3. 监听服务器节点动态上下线案例

需求

​ 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线

需求图

Cris 的 ZooKeeper 学习笔记

server 端代码

/**
 * 模拟服务者,也叫生产者,为消费者提供服务的服务器
 *
 * @author zc-cris
 * @version 1.0
 **/
public class Server {

    /**
     * ZooKeeper客户端
     **/
    private ZooKeeperzooKeeper;
    /**
     * 连接超时时间单位为 ms
     **/
    private static int timeOut;
    /**
     * 远程运行 ZooKeeper服务的 Linux 服务器的 ip 和 port
     **/
    private static String connectString;
    /**
     * 服务者注册到 ZooKeeper的主 node 节点名称
     **/
    private static String parentServer;

    static {
        connectString = "192.168.1.101:2181,hadoop102:2181,hadoop103:2181";
        timeOut = 2000;
        parentServer = "/servers";
    }


    public static void main(String[] args) {

        Server server = new Server();
        // 1. 创建 ZooKeeper对象
        server.getConnection();

        // 2. 注册当前服务者到 ZooKeeper服务列表中
        server.registerServer(args[0]);

        // 3. 当前服务者跑的服务
        server.business(args[0]);
    }

    /**
     * 连接 ZooKeeper服务并获取 ZooKeeper对象,这里的 watcher 最好别为空,否则出现异常(但是不影响程序运行)
     */
    private void getConnection() {
        // 创建 ZooKeeper客户端对象
        try {
            ZooKeeper= new ZooKeeper(connectString, timeOut, watchedEvent -> {

            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 注册自己到 ZooKeeper指定列表,注意:必须使用 CreateMode.EPHEMERAL_SEQUENTIAL 模式
     * 否则会出现服务者重名而无法注册成功
     *
     * @param hostname 服务者的主机名
     */
    private void registerServer(String hostname) {
        try {
            zooKeeper.create(parentServer + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(hostname + " 已经注册到 ZooKeeper了");
    }

    /**
     * 模拟服务者的业务
     *
     * @param hostname 服务者的主机名
     */
    private void business(String hostname) {
        System.out.println(hostname + "跑业务中....");
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Client 代码

/**
 * 模拟消费者
 *
 * @author zc-cris
 * @version 1.0
 **/
public class Client {
    /**
     * ZooKeeper客户端端
     **/
    private ZooKeeperzooKeeper;
    /**
     * 连接超时时间单位为 ms
     **/
    private static int timeOut;
    /**
     * 远程 ZooKeeper服务所在 Linux 服务器的 ip 和 port
     **/
    private static String connectString;
    /**
     * 服务者注册到 ZooKeeper的主 node 节点名称
     **/
    private static String parentServer;

    static {
        connectString = "192.168.1.101:2181,hadoop102:2181,hadoop103:2181";
        timeOut = 2000;
        parentServer = "/servers";
    }

    /**
    * 标志位,用于判断第一次连接 ZooKeeper不执行 getServers 的逻辑代码
    **/
    private boolean flag = false;

    public static void main(String[] args) {

        Client client = new Client();
        // 1. 创建 ZooKeeper对象
        client.getConnection();

        // 2. 获取到 ZooKeeper服务上的所有服务者的注册列表
        client.getServers();

        // 3. 当前消费者跑的业务
        client.business();
    }

    /**
     * 获取 ZooKeeper上注册的所有服务者的列表并打印出来
     * 每次服务者列表的变动,ZooKeeper会自动通知到该消费者,该消费者持有的 ZooKeeper对象里面的监听器就会自动调用
     * getServers 方法
     */
    private void getServers() {
        if (flag) {
            try {
                List<String> children = zooKeeper.getChildren(parentServer, true);
                // lambda 表达式对列表中的每一个元素进行打印操作
                children.forEach(c ->
                {
                    try {
                        System.out.println(new String(zooKeeper.getData(parentServer + "/" + c, false, null))
                                + " 服务者运行着");
                    } catch (InterruptedException | KeeperException e) {
                        e.printStackTrace();
                    }
                });

            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        } else {
            flag = true;
        }
    }

    /**
     * 连接 ZooKeeper服务并初始化 ZooKeeper对象,并且通过 lambda 表达式 创建监听器 Watcher 对象,重写 process 方法
     */
    private void getConnection() {
        try {
            ZooKeeper= new ZooKeeper(connectString, timeOut, watchedEvent -> getServers());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 堵塞模拟一直运行的客户端
     */
    private void business() {
        System.out.println("客户端跑着服务");
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果如下

三台 Linux 运行 ZooKeeper 如下

Cris 的 ZooKeeper 学习笔记

输入 create /servers 'servers' 事先创建好服务者注册列表 node

Cris 的 ZooKeeper 学习笔记

然后启动 Server 程序三次

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记

打开三台 Linux 任意一台的 ZooKeeper 客户端

Cris 的 ZooKeeper 学习笔记

可以发现服务者列表随着服务者程序的启动已经注册上去了

然后启动 Client

Cris 的 ZooKeeper 学习笔记

关掉 101 服务者,然后查看 Client 的console

Cris 的 ZooKeeper 学习笔记

再次查看 Linux 上的 ZooKeeper 节点情况

Cris 的 ZooKeeper 学习笔记

发现随着 101 服务者的下线,服务列表也跟着修改了

然后重新上线 101 服务者

Cris 的 ZooKeeper 学习笔记

Cris 的 ZooKeeper 学习笔记