4、Zookeeper实战

分布式安装部署

  1. 集群规划
    在hadoop-100,hadoop-101,hadoop-102三个节点上部署Zookeeper
  2. 解压Zookeeper
  3. 创建zkDate目录
  4. 在zkData目录下常见myid文件
  5. 修改zoo_sample.cfg文件为zoo.cfg文件
  6. 把hadoop-100上的/opt/module/zookeeper-3.4.10分发到hadoop-101和hadoop-102上
    [[email protected] module]$ xsync zookeeper-3.4.10/
  7. 配置服务器编号
    在hadoop-100,hadoop-101,hadoop-102分别修改myid文件为1,2,3
  8. 配置zoo.cfg文件
    在hadoop-100上修改数据存储路径
    dataDir=/opt/module/zookeeper-3.4.10/zkData
    并增加如下配置
    server.1=hadoop-100:2888:3888
    server.2=hadoop-101:2888:3888
    server.3=hadoop-102:2888:3888
    配置参数解读
    server.A=B:C:D
    A是一个数字,表示这个是第几号服务器
    集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server
    B是这个服务器的ip地址
    C是这个服务器与集群中的Leader服务器交换信息的端口
    D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口
  9. 同步zoo.cfg文件
    [[email protected] conf]$ xsync zoo.cfg
  10. 集群操作
    分别启动Zookeeper
    [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start
    [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start
    [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start
    查看状态
    [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh status
    [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh status
    [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh status

客户端命令行操作

命令基本语法 功能描述
help 显示所有操作命令
ls path [watch] 使用 ls 命令来查看当前znode中所包含的内容
ls2 path [watch] 查看当前节点数据并能看到更新次数等数据
create 普通创建 -s 含有序列 -e 临时(重启或者超时消失)
get path [watch] 获得节点的值
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
rmr 递归删除节点
  1. 启动客户端
    [[email protected] zookeeper-3.4.10]$ bin/zkCli.sh
  2. 显示所有操作命令
    [zk: localhost:2181(CONNECTED) 0] help
  3. 查看当前znode中所包含的内容
    [zk: localhost:2181(CONNECTED) 1] ls /
  4. 查看当前节点详细数据
    [zk: localhost:2181(CONNECTED) 2] ls2 /
  5. 创建两个普通节点
    [zk: localhost:2181(CONNECTED) 3] create /test "hello Zookeeper"
    [zk: localhost:2181(CONNECTED) 4] create /test/hello "hello i use zookeeper"
  6. 获得节点的值
    [zk: localhost:2181(CONNECTED) 5] get /test
    [zk: localhost:2181(CONNECTED) 6] get /test/hello
  7. 创建短暂节点
    create -e /test/ephemeral "Ephemeral data"
    在当前客户端查看
    [zk: localhost:2181(CONNECTED) 8] ls /test
    退出当前客户端
    [zk: localhost:2181(CONNECTED) 9] quit
    重启客户端
    [[email protected] zookeeper-3.4.10]$ bin/zkCli.sh
    再次查看,节点已经删除
    ls /test
  8. 创建带序号的节点
    先创建普通节点
    [zk: localhost:2181(CONNECTED) 3] create /test/sdata "sdata"
    创建再序号的节点
    [zk: localhost:2181(CONNECTED) 5] create -s /test/sdata/data "hello"
    [zk: localhost:2181(CONNECTED) 5] create -s /test/sdata/data "hello"
    [zk: localhost:2181(CONNECTED) 5] create -s /test/sdata/data "hello"
    如果原来没有序号节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从2开始,以此类推
  9. 修改节点数据值
    [zk: localhost:2181(CONNECTED) 10] set /test/hello "hhh"
  10. 节点的值变化监听
    在hadoop-101上注册监听/test/hello节点数据变化
    [zk: localhost:2181(CONNECTED) 0] get /test/hello watch
    在hadoop-100上修改/test/hello的数据
    [zk: localhost:2181(CONNECTED) 11] set /test/hello "hhhh"
    在hadoop-101上观察收到的数据变化的监听
    [zk: localhost:2181(CONNECTED) 1]
    WATCHER::
    WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/hello
  11. 节点的子节点变化监听(路径变化)
    在hadoop-101上监听/test节点的子节点的变化
    [zk: localhost:2181(CONNECTED) 2] ls /test watch
    在hadoop-100/test节点上创建子节点
    [zk: localhost:2181(CONNECTED) 12] create -s /test/watch "hhh"
    在hadoop-101上观察收到子节点变化的监听
    [zk: localhost:2181(CONNECTED) 3]
    WATCHER::
    WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/test
  12. 删除节点
    [zk: localhost:2181(CONNECTED) 14] delete /test/watch0000000003
  13. 递归删除节点
    [zk: localhost:2181(CONNECTED) 15] rmr /test
  14. 查看节点状态
    stat /

API应用

环境准备

创建maven工程
添加pom文件

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
</dependencies>

在resources目录下,新增log4j.properties文件

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

创建Zookeeper客户端

private static String connectString = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181";
    private static int sessionTime = 2000;
    private ZooKeeper zkClient = null;

    @Test
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTime, null);
    }

创建子节点

@Test
    public void create() throws KeeperException, InterruptedException {
        // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
        String s = zkClient.create("/test", "testApi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

获取子节点并监听节点变化

@Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("watchedEvent.getType() = " + watchedEvent.getType());
                System.out.println("watchedEvent.getPath() = " + watchedEvent.getPath());
                try {
                    zkClient.getChildren("/", true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
@Test
    public void getDataAndWatch() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/", true);
        for(String data : children) {
            System.out.println("data = " + data);
        }
        Thread.sleep(Long.MAX_VALUE);
    }

判断Znode是否存在

@Test
    public void exist() throws KeeperException, InterruptedException {
        Stat exists = zkClient.exists("/test/apii", false);
        System.out.println(exists == null ? "not exist" : "exist");
    }

监听服务器节点动态上下线案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线

需求分析

4、Zookeeper实战
服务器动态上下线.png

具体实现

  1. 先在集群上创建/servers节点
    [zk: localhost:2181(CONNECTED) 10] create /servers "servers"
  2. 服务端代码
public class DistributeServer {

    private static String connectString = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";
    
    // 创建到zk的客户端连接
    public void getConnect() throws IOException{
        
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
    
    // 注册服务器
    public void registServer(String hostname) throws Exception{

        String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        System.out.println(hostname +" is online "+ create);
    }
    
    // 业务功能
    public void business(String hostname) throws Exception{
        System.out.println(hostname+" is working ...");
        
        Thread.sleep(Long.MAX_VALUE);
    }
    
    public static void main(String[] args) throws Exception {
        
// 1获取zk连接
        DistributeServer server = new DistributeServer();
        server.getConnect();
        
        // 2 利用zk连接注册服务器信息
        server.registServer(args[0]);
        
        // 3 启动业务功能
        server.business(args[0]);
    }
}

  1. 客户端代码
public class DistributeClient {
    private static String connectString = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";

    // 创建到zk的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override
            public void process(WatchedEvent event) {

                // 再次启动监听
                try {
                    getServerList();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    // 获取服务器列表信息
    public void getServerList() throws Exception {
        
        // 1获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zk.getChildren(parentNode, true);

        // 2存储服务器信息列表
        ArrayList<String> servers = new ArrayList<>();
        
        // 3遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zk.getData(parentNode + "/" + child, false, null);

            servers.add(new String(data));
        }

        // 4打印服务器列表信息
        System.out.println(servers);
    }

    // 业务功能
    public void business() throws Exception{

        System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {

        // 1获取zk连接
        DistributeClient client = new DistributeClient();
        client.getConnect();

        // 2获取servers的子节点信息,从中获取服务器信息列表
        client.getServerList();

        // 3业务进程启动
        client.business();
    }
}