Zookeeper实现简单的Master选举以及主从数据库数据同步

1.zookeeper的概念与搭建

这类文章很多,所以我就不详细说了,推荐一个详细的技术文章,zookeeper学习。后面我会简单的描述搭建过程。

3.Zookeeper节点

1:stat:状态信息,znode版本信息
2:节点保存的数据信息:与该znode关联的数据
3:children:znode下的子节点信息

2.节点属性

Data:节点保存的数据的内容
1.zxid节点操作的zxid时间戳
2.czxid节点创建的zxid格式的时间戳
3.Mzxid节点修改的zxid时间戳
4.Version节点数据版本号
5.cversion子节点数据版本号
6.Aversion 节点所拥有的acl版本号

4.节点类型

1.临时节点,客户端与服务端断开连接后,节点消失
2.永久节点,永久存在,除非调用删除节点

5.shell操作节点指令

  1. create [-s][-e] path data acl
    其中,-s和-e分别指顺序节点和临时节点,data节点中的数据u,acl用来 进行权限控制

  2. 读取节点

    1.ls path查看节点,与linux系统查看系统目录文件相似
    2.get path 获取指定节点的内容,包括属性信息
    3.ls2 path 显示属性,但是不显示数据

  3. 更新节点

    1.set path data[version] version需正确,默认最新数据版本,版本号要 与当前版本一致

  4. 删除节点:

    1.delete path[version]
    2.Rmr path 递归删除 删除节点以及子节点

  5. quota命令

    1.setquota -n|-b val path
    			 n:表示节点的最大个数
    			 b:表示数据只的最大长度
    			 val:子节点最大个数或数值的最大长度
    			path:节点路径
    		2.listquota节点排列
    

6.Tips

1.节点名称前面一定要加/,表示路径,例如 ls /root(查看根目录下的root节点)

7.二.数据库同步以及Master Ip选举

  1. 功能:

    三台机器每台中都有mysql数据库,但是三台机器只连一台数据库,我们叫这个数据库叫主数据库,三台机器会对主数据库进行数据的crud操作,主数据库如果发生数据变化,那么从数据库也要更新,当主数据库因为网络问题,断开连接,系统会自动从从数据库中选取一个数据库做主数据(master选举),断开的数据库连接上后成为从数据库,并且数据要和当前的主数据库数据一致。

  2. 搭建Zookeeper集群

    1.将zookeeper压缩包解压到集群中的每台计算机
    2.修改conf目录下的zoo.cfg文件(例如)
    Zookeeper实现简单的Master选举以及主从数据库数据同步

    红框表示的是三台主机的IP,server.x表示该zookeeper在集群中的编号,,2888是 flower和leader通信端口, 3888选举端口。

    3.在data目录下新建文件myid,没有后缀名,内容为该主机的zookeeper服务器的 编号,如果编号为1,文件内容就是1。
    4.启动每台主机上的zookeeper服务,点击zkServer.cmd
    5.在其中一台机器上创建一个节点,看从其他机器上是能获得该节点信息
    6.每台主机上装一个mysql数据库,为实现主从结构
    7.数据库开启远程可连接(开启之后就不能用localhost和127.0.0.1访问本地数据 库,需要写明电脑在网络下的IP地址,例如mysql -h10.1.6.120 -uroot -padmin123)
    Mysql -uroot -padmin123;
    use mysql;
    update user set host = ‘%’ where user = ‘root’;
    8.导入zkapi代码,实现简单的leader选举以及主从数据的同步,此代码为java 项目。
    Zookeeper实现简单的Master选举以及主从数据库数据同步
    com.aw.zookeeper.server.leader实现选举
    com.aw.zookeeper.server.mysql实现选举与数据同步
    运行点击client1.java client2.java client3.java,这三个java模拟了zookeer三个客户端
    需要导入的jar包
    Zookeeper实现简单的Master选举以及主从数据库数据同步

  3. 代码思路以及代码
    代码中,没有使用zookeeper原生的api,因为原生的api对节点的监听是一次性的,每次监听触发后,又得给节点重新加监听,头皮发麻。所以引用zkclient 包。

Zookeeper实现简单的Master选举以及主从数据库数据同步
zookeerper集群中的某一个zk客户端创建一个临时节点\master,主要是用来存放Master数据库的IP的,然后其他的机器连接\master中的IP的数据库,即为主数据库,例如当前部署zookeeper1的数据库为主数据库,当因为某些原因导致zookeeper1断开连接,此时临时节点会因为断开连接而消失,其他客户端监到该节点删除的事件后,立马争抢创建\master节点,谁抢到了就将自己的IP放进去,这样新抢的就成了主。之前挂掉的此时如果重新连接到集群中,也只能是从,等待主挂掉之后进行主的争抢。

\sqlinfo 该节点是存放数据库sql语句的,一旦某个数据库进行了数据的操作,将该sql语句放入到\sqlinfo 节点中,当每个客户端监听到该节点的变化,执行该sql语句,这样所有的数据库的数据就会同步。

主要代码如下所示(代码压缩包也会放在****上的)

package com.aw.zookeeper.server.mysql;

import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;

import com.aw.zookeeper.util.Utils;
import com.mysql.jdbc.Statement;
import com.mysql.jdbc.Util;

/**
 * 实现自动选举主数据库并且数据同步
 * @author jinbao_zhang_ext
 *
 */

public class WorkServer implements Serializable {

	    // 记录服务器状态
	    private volatile boolean running = false;

	    private ZkClient zkClient;
	    // Master节点对应zookeeper中的节点路径
	    private static final String MASTER_PATH = "/master";
	    private static final String SQLINFO_PATH="/sqlinfo";
	    // 监听Master节点删除事件
	    private IZkDataListener dataListener;
	    //监听sql节点改动信息
	    private IZkDataListener sqlListener;
	    // 记录当前节点的基本信息
	    private RunningData serverData;
	    //记录当前服务执行的sql语句,提供给所有服务
	    private String sql;
	    // 记录集群中Master节点的基本信息
	    private RunningData masterData;
	    //线程池
	    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
	    private int delayTime = 5;
	    private Statement statement;
	    //测试用的ip,用来指定该server对应的是哪台数据库
	    private String dataBaseIp;

	    public WorkServer(RunningData rd) {
	        this.serverData = rd; // 记录服务器基本信息
	        this.dataListener = new IZkDataListener() {

	            public void handleDataDeleted(String dataPath) throws Exception {

	                //takeMaster();

	                if (masterData != null && masterData.getName().equals(serverData.getName())){
	                    // 自己就是上一轮的Master服务器,则直接抢
	                    takeMaster();
	                } else {
	                    // 否则,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免不必要的数据迁移开销
	                    delayExector.schedule(new Runnable(){
	                        public void run(){
	                            try {
									takeMaster();
								} catch (SQLException e) {
									e.printStackTrace();
								}
	                        }
	                    }, delayTime, TimeUnit.SECONDS);
	                }
	               

	            }

	            public void handleDataChange(String dataPath, Object data)
	                    {
	            	if(dataPath.equals(SQLINFO_PATH)){
	            		System.out.println(dataPath+"---changed,the data became "+data);
	            	//statement=Utils.getSt();//实际生产环境获取的statement对象
	            		//将获取到的sql语句执行到本地数据库,实现同步操作
		            	try {
		            		
							statement=(Statement) Utils.getConnByIp(getDataBaseIp()).createStatement();////测试环境获取的statement对象
							statement.executeUpdate(data.toString());
						} catch (SQLException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}finally{
							try {
								statement.close();//测试环境关闭,释放内存,实际环境单例模式,可不用执行后立马关闭
							} catch (SQLException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							}
						}
	            	}
	            }
	        };
	      
	    }

	    public ZkClient getZkClient() {
	        return zkClient;
	    }

	    public void setZkClient(ZkClient zkClient) {
	        this.zkClient = zkClient;
	    }
	    

	    public String getDataBaseIp() {
			return dataBaseIp;
		}

		public void setDataBaseIp(String dataBaseIp) {
			this.dataBaseIp = dataBaseIp;
		}

		// 启动服务器
	    public void start() throws Exception {
	        if (running) {
	            throw new Exception("server has startup...");
	        }
	        running = true;
	        // 订阅Master节点删除事件
	        zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
	        zkClient.subscribeDataChanges(SQLINFO_PATH, dataListener);
	        // 争抢Master权利
	        takeMaster();

	    }

	    // 停止服务器
	    public void stop() throws Exception {
	        if (!running) {
	            throw new Exception("server has stoped");
	        }
	        running = false;
	        
	        delayExector.shutdown();
	        // 取消Master节点事件订阅
	        zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
	        // 释放Master权利
	        releaseMaster();

	    }

	    // 争抢Master
	    private void takeMaster() throws SQLException  {
	        if (!running)
	            return;

	        try {
	            // 尝试创建Master临时节点
	            zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
	            masterData = serverData;
	           
	            System.out.println("master database ip 为"+masterData.getName()+"接下服务连接master该数据库");

	            // 作为演示,我们让服务器每隔5秒释放一次Master权利
	            /*delayExector.schedule(new Runnable() {            
	                public void run() {
	                    // TODO Auto-generated method stub
	                    if (checkMaster()){
	                        releaseMaster();
	                    }
	                }
	            }, 5, TimeUnit.SECONDS);*/
	            
	        } catch (ZkNodeExistsException e) { // 已被其他服务器创建了
	            // 读取Master节点信息
	            RunningData runningData = zkClient.readData(MASTER_PATH, true);
	            if (runningData == null) {
	                takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢
	            } else {
	            	//slave活过来从数据要更新与主数据保存一致
	            	masterData = runningData;
	            	//创建master数据库的statement对象
	            	//获取本地(从)数据库的statement对象
	            	Statement masterStatement=null;
	            	Statement statement=null;
	            	ResultSet rs=null;
	                try {
	                	masterStatement=(Statement) Utils.getConnByIp(masterData.getName()).createStatement();
						statement=(Statement) Utils.getConnByIp(getDataBaseIp()).createStatement();
						//清空本地数据库
						statement.execute("delete from testsyn");
						 rs=masterStatement.executeQuery("select * from testsyn");
						while(rs.next()){
							int no=rs.getInt("no");
							String name=rs.getString("name");
							System.out.println("no is "+no+",name is"+name);
							String sql ="insert into testsyn (no,name) values ("+no+",'"+name+"')";
							statement.addBatch(sql);
						} 
						statement.executeBatch();
	                
					} catch (SQLException e1) {
						e1.printStackTrace();
	            }finally{
	            	if(masterStatement!=null){
	            		masterStatement.close();
	            	}
	            	if(statement!=null){
	            		statement.close();
	            	}
	            	if(rs!=null){
	            		rs.close();
	            	}
	            }
	           }
	        }
	    }

	    // 释放Master权利
	    private void releaseMaster() {
	        if (checkMaster()) {
	            zkClient.delete(MASTER_PATH);
	        }
	    }

	    // 检测自己是否为Master
	    private boolean checkMaster() {
	        try {
	            RunningData eventData = zkClient.readData(MASTER_PATH);
	            masterData = eventData;
	            if (masterData.getName().equals(serverData.getName())) {
	                return true;
	            }
	            return false;
	        } catch (ZkNoNodeException e) {
	            return false; // 节点不存在,自己肯定不是Master了
	        } catch (ZkInterruptedException e) {
	            return checkMaster();
	        } catch (ZkException e) {
	            return false;
	        }
	    }
	    //发送sql到zookeeper服务器上
	    public void sendSql(String sql){
	    	if(zkClient.exists(SQLINFO_PATH)){
	    		zkClient.writeData(SQLINFO_PATH, sql);	
	    		System.out.println(SQLINFO_PATH +"is exist");
	    	}else{
	    		//创建永久节点
	    		zkClient.create(SQLINFO_PATH, sql, CreateMode.PERSISTENT);
	    	}	    	
	    }
	    

	}



package com.aw.zookeeper.server.mysql;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.ZooKeeper;

public class Client1 {
	private  static String ZOOKEEPER_SERVER="10.1.6.120";
	
	public static void main(String[] args) throws Exception {
		ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 50000, 50000, new SerializableSerializer());
		RunningData ip=new RunningData();
		ip.setName(ZOOKEEPER_SERVER);
		WorkServer server =new WorkServer(ip);
		//这是本地服务器IP,实际生产环境不需要设置
		server.setDataBaseIp(ZOOKEEPER_SERVER);
		server.setZkClient(client);
		server.start();
		for(int i=1;i<=100;i++){
			System.out.println("敲回车添加数据!\n");
			new BufferedReader(new InputStreamReader(System.in)).readLine();
			server.sendSql("insert into testsyn (no,name) values("+i+",'zhangjinbao');");
		
		}
		System.out.println("敲回车退出程序!\n");
		new BufferedReader(new InputStreamReader(System.in)).readLine();
		
        server.stop();
        server.getZkClient().close();
	}

}