Zookeeper分布式服务协调组件
1.简介
Zookeeper是一个分布式服务协调组件,是Hadoop、Hbase、Kafka重要的依赖组件,它是一个为分布式应用提供一致性服务的组件。
Zookeeper的目标就是封装好复杂易出错的服务,为使用者提供高效、稳定的服务。
Zookeeper的使用场景:
1.Hadoop、Hbase、Kafka的依赖组件。
2.作为注册中心,用于维护服务列表。
3.作为项目的配置中心,将一些重要的可以动态配置的信息放入zk中,利用zk的通知机制,当状态发生改变时通知客户端,将其变量放入静态变量中。
4.作为本地缓存的更新策略,当信息更新时更新节点的值,利用zk的通知机制,各个应用获取到节点的值,再从数据库中查询,然后更新到本地缓存。
5.中间件的高可用性。
2.模型
2.1 Zookeeper的文件系统
Zookeeper维护了一个类似文件系统的数据结构,有根目录 (/) 和若干个子目录 (树形结构 , 与Linux类似 )
*每个目录都称为一个znode,每个znode都包含自身节点的数据且每个znode下可以包含多个znode。
*在创建znode时必须指定znode的数据,可以为null。
*znode下的数据有版本号,当进行更新操作时版本号会+1。
*当使用JAVA进行更新和删除操作时,需要传递版本号,其内部进行CAS判断,当且仅当传递的版本号与当前节点的版本号相同时,才进行操作(并非每个znode下包含多个版本的数据,传递版本号只是用来做CAS,默认值为-1,表示不进行CAS判断)
*删除znode时,若该znode包含子znode,那么必须先删除所有的子znode,否则无法删除。
znode类型
持久化节点:无论客户端的连接是否断开,该节点依然存在。
持久化节点并顺序编号:在持久化节点的基础上,Zookeeper动态的为znode进行自动编号,即创建/p节点,那么Zookeeper将把其节点命名为/p1,当再次创建/p节点,那么Zookeeper将把该节点命名为/p2。
临时节点:当客户端的连接断开,临时节点及其节点中的数据将会被删除。
临时节点并顺序编号:在临时节点的基础上,Zookeeper动态的为znode进行自动编号,与持久化节点并顺序编号的区别是,临时节点并顺序编号会在客户端断开连接之后会自动删除节点以及节点中的数据。
2.2 Zookeeper的通知机制
客户端可以监听它关心的节点,当目标节点发生变化时 (数据版本号改变、被删除、子目录节点增加和删除),Zookeeper会通知客户端,客户端再作出相应的处理。
*zk并不是根据节点的值改变而触发通知的,而是根据节点中数据的版本号。
*当节点中的值重复时,但由于数据的版本号发生改变,因此仍然可以通过通知机制通知客户端。
2.3 Zookeeper集群的一致性同步
Zookeeper一般是通过集群的方式进行使用,即多台Zookeeper服务构成一个有关系的组。
当搭建了一个Zookeeper集群,Zookeeper会根据选举算法,从多个Zookeeper节点中选取一个作为Leader,剩余的节点作为Follower,Leader会与各个Follower建立一个有效的长连接,保证各个节点的通信正常 (每台服务器都有可能被选取为Leader)
当Zookeeper集群搭建完成后,就可以启动很多个客户端与zk节点进行连接 (长连接方式,保证客户端与服务器能有效持久的连接)
当某个节点收到修改的操作时,首先会把请求转发给Leader,Leader内有处理机制,它会操作修改并且同步操作给所有的Follower节点。
*一旦选取的Leader节点宕机,则会重新组织Zookeeper集群,选取新的Leader,重新与各个节点建立连接 (重新选取的时间很短,大概200ms)
3.Zookeeper的使用
3.1 安装
*由于Zookeeper是由java语言编写的,因此在安装Zookeeper前需要安装好JDK,并且配置环境变量JAVA_HOME
从Zookeeper官网下载zk进行解压安装:
3.2 目录结构
bin目录
zkEnv.sh:用于配置zk服务启动时的环境变量 (包括加载配置文件的路径等)
zkServer.sh:用于启动zk服务,默认监听2181端口。
zkCli.sh:用于启动zk客户端。
zookeeper.out:用于存放zk运行时的日志。
conf目录
log4j.properties文件:zk运行时的日志配置文件,默认日志信息都将打印到bin目录下的zookeeper.out文件 (当使用Zookeeper遇到异常时应该查看此文件下的内容)
zoo_sample.conf文件:zk服务的配置文件,由Zookeeper官方提供 (默认zk服务启动时将加载conf目录下的zoo.cfg配置文件)
3.3 配置文件
Zookeeper启动时默认加载conf目录下的zoo.cfg配置文件,因此将conf目录下的zoo_sample.conf配置文件更名为zoo.cfg。
配置文件
#基础配置
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata
dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog
clientPort=2181
autopurge.purgeInterval=1
tickTime:initLimit、syncLimit属性的时间单位,值是毫秒。
initLimit:Zookeeper集群搭建前所允许的初始化时间。
syncLimit:Leader发送心跳给Follower,Follower向Leader回复心跳这一过程所允许的最大时长 (rtt,往返时间),一旦超过了这个时间,则Leader认为该Follower宕机。
dataDir:Zookeeper快照日志的存放目录。
dataLogDir:Zookeeper事务日志的存放目录。
clientPort:Zookeeper服务监听的端口,默认为2181。
*当其中一台zk服务启动后,剩余的zk服务必须在initLimit规定的时间内全都启动,否则zk进行集群的搭建时会认为未启动的zk服务已经失效。
*如果不配置dataLogDir,那么Zookeeper的事务日志将写入到dataDir目录下 (会严重影响zk的性能)
3.4 Zookeeper的启动
使用zkServer.sh命令启动Zookeeper服务。
使用jps命令查询zk进程是否启动成功,当出现QuorumPeerMain表示zk启动成功。
3.5 Zookeeper集群搭建
1.修改配置文件,添加zk集群配置
#基础配置
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog clientPort=2181 autopurge.purgeInterval=1
#集群配置 server.1=192.168.1.119:2888:3888 server.2=192.168.1.122:2888:3888 server.3=192.168.1.125:2888:3888
在conf文件下使用server.标识属性配置zk集群,使多个zk服务构成一个组 (标识必须为整数)
server.本机zk标识 = zk服务地址:leader和follower之间的通信端口:leader选举端口
server.其他zk标识 = zk服务地址:leader和follower之间的通信端口:leader选举端口
*标识与zk服务进行绑定,因此同一个集群下的zk服务的标识不能相同。
*leader和follower之间的通信端口默认是2888,leader选举端口默认是3888。
2.在快照日志目录下创建myid文件,文件中的值是本台zk服务的唯一标识(给节点编号)
#将1输入到myid文件中
echo "1" > myid
*需要修改要构成集群的其他zk节点的配置文件以及设置其myid文件。
3.启动Zookeeper服务
*在 initLimit * tickTime的时间内启动集群中的所有zk节点。
4.查看Zookeeper的状态
注意事项
*搭建Zookeeper集群时务必遵循2n+1个节点,因为根据Zookeeper的工作原理,只要有大于一半的节点存活,则Zookeeper集群就能够对外提供服务。
*搭建zk集群时需关闭每台zk服务器上的防火墙或者开放对应的端口,否则集群中的zk间无法进行通讯。
*zk集群在高负荷的工作时会产生大量的事务日志,如果日志长期不进行清理容易将分区中的空间占满最终导致zk服务无法运行,因此需要定期清理zk产生的事务日志 (可以配合Linux的crontab命令设置每天定时去执行清除日志文件的脚本)
3.6 Zookeeper客户端操作
3.7 Java中操作Zookeeper
可以通过Apache提供的Zookeeper API对zk进行操作 (提供基本功能),也可以使用Apache提供的Curator框架操作zk (提供更全面的功能)
*Curator框架除了基本的节点添加、删除、修改、查询,监听节点功能外,还提供了session超时重连、主从选举、分布式计算器、分布式锁等等适用于各种复杂的zk场景的API封装。
1.导入Maven依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<type>pom</type>
</dependency>
2.建立连接
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException
connectString:zookeeper server列表,多个以逗号隔开。
sessionTimeout:指定连接Zookeeper的超时时间。
watcher:事件回调接口。
*ZooKeeper实例将从服务列表中选择一个server建立连接,若连接失败则选择另外一个server重新进行连接。
*ZooKeeper实例是通过异步的方式建立连接,当连接建立完毕后回调指定Watcher的process方法,因此程序为了保证同步建立连接,可以使用JAVA提供的CountDownLatch同步辅助类进行控制。
3.调用Zookeeper提供的API
//创建节点,指定节点路径、数据、节点的类型 public String create(final String path, byte data[], List<ACL> acl,CreateMode createMode) //获取子节点 public List<String> getChildren(final String path, boolean watch) //判断节点是否存在 public Stat exists(String path, boolean watch) //获取节点中的数据 public byte[] getData(String path, boolean watch, Stat stat) //设置或更新节点中的数据 public Stat setData(final String path, byte data[], int version) //删除节点 public void delete(final String path, int version)
*创建节点时需要指定节点的类型,Apache Zookeeper API中提供了CreateMode枚举类,用于指定节点的类型。
CreateMode.PERSISTENT:持久化节点
CreateMode.PERSISTENT_SEQUENTIAL:持久化节点并顺序编号
CreateMode.EPHEMERAL:临时节点
CreateMode.EPHEMERAL_SEQUENTIAL:临时节点并顺序编号
*当进行更新和删除操作时,需要传递版本号,其内部进行CAS判断,当且仅当传递的版本号与当前节点的版本号相同时,才进行操作(并非每个znode下包含多个版本的数据,传递版本号只是用来做CAS,默认值-1,表示不进行CAS判断)
*Apache Zookeeper API中有很多方法都支持Watcher类型参数,Watcher可用于监听事件的发生以及连接状态的改变,包括节点的创建、删除、节点中数据的改变、节点的子节点发生改变等事件,失去连接、异步连接、认证失败、只读连接、连接过期等连接状态。
完整示例
/** * @Auther: ZHUANGHAOTANG * @Date: 2018/11/12 14:55 * @Description: */ public class ZKUtils { /** * 日志输出 */ private static Logger logger = LoggerFactory.getLogger(ZKUtils.class); /** * ZK服务列表 */ private static final String URLS = "192.168.1.80:2181,192.168.1.81:2181,192.168.1.83:2181"; /** * 连接Zookeeper的超时时长(单位:毫秒) */ private static final int SESSION_TIMEOUT = 3000; /** * Zookeeper连接对象 */ private static ZooKeeper zk = null; static { try { CountDownLatch countDownLatch = new CountDownLatch(1);//锁存器(同步辅助类) zk = new ZooKeeper(URLS, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { countDownLatch.countDown();//倒数器-1 } } }); countDownLatch.await(); } catch (Exception e) { logger.info("Zookeeper获取连接失败,{}", e); } } /** * 创建节点 * * @param path * @param data * @param createMode * @throws Exception */ public static void createPath(String path, String data, CreateMode createMode) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } if (!exists(path)) { zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode); } } /** * 获取子节点 * * @param path * @return * @throws Exception */ public static List<String> getSubNode(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } return zk.getChildren(path, false); } /** * 判断节点是否存在 * * @param path * @return * @throws Exception */ public static boolean exists(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } if (zk.exists(path, false) != null) { return true; } return false; } /** * 获取节点中的数据 * * @param path * @return * @throws Exception */ public static String getData(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } return new String(zk.getData(path, false, null)); } /** * 更新节点中的数据 * * @param path * @param data * @throws Exception */ public static void setData(String path, String data) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } zk.setData(path, data.getBytes(), -1); } /** * 删除节点 * * @param path * @throws Exception */ public static void deletePath(String path) throws Exception { if (StringUtils.isBlank(path)) { throw new Exception("path is null"); } zk.delete(path, -1); } }