Day16.高性能RPC设计 学习笔记4 - Zookeeper(转载)
Zookeeper
ZooKeeper 是一个为分布式应用所设计的分布的、开源的协调服务。可以解决分布式应用中出现常规问题:
同步配置
、选举
、分布式锁
、服务命名分组
,记住这些问题虽然zookeeper可以帮助用户解决,并不意味着用户不需要写代码。用户如果想使用zookeeper去解决以上出现的问题,需要用户巧妙利用Zookeeper的节点特性进行编程继而实现以上功能。
【什么是分布式锁?在同一时刻,多个进程只有一个进程,执行某个资源;借由zookeeper来按序列进行建立会话调控;执行结束后自行挂断】
Zookeeper特性
zookeeper服务在内存中维系一个类似于目录结构
的命名空间。这些所谓的目录
或者是文件
在zookeeper中统称为znode
,所有的znode仅仅是依存关系上类似于操作系统目录,因为所有的znode
节点都可以存储数据。
ZooKeeper的节点是通过像树样的结构来维护的,并且每个节点通过路径来标识访问。除此之外,每个节点还拥有些信息包括:数据、数据长度、创建时间、修改时间等等。从这样既含有数据,又作为路径表标的节点的特点中,可以看出,ZooKeeper的节点既可以被看做是个文件件,又可以被看做是个目录,它同时具有两者的特点。通常将zookeeper的节点称为znode
四个节点|一个特性
四个节点
:持久节点
、临时节点
、持久序列
、临时序列
持久
(PERSISTENT):该节点不会因为创建该节点的会话
消失而导致节点销毁【存数据,且数据持久】
临时
:该节点会因为创建该节点的会话
消失而导致节点销毁【存数据,节点消失】
两个 序列
(PERSISTENT_SEQUENTIAL / EPHEMERAL_SEQUENTIAL):系统会自动为序列节点维系一份创建的顺序编号,用于表示某个节点下子节点的创建顺序【有序,节点的资源优先权】节点监测:zookeeper支持节点监测,当用户关注的节点的数据或者该节点的子节点发生变更,zookeeper可以及时通知给订阅该节点的客户端节点。
ps.临时节点|临时序列节点下不能创建子节点【面试点】
一个特性
:zookeeper存在节点的注册和监听,如果节点发生变更(数据、子节点)zookeeper都可以将变更信息直接推送给所有订阅该节点的服务。
zookeeper 集群 (CP 一致性、分区容错误)
集群版
[[email protected] ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[[email protected] ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000
dataDir=/root/zkdata
clientPort=2181
initLimit=5
syncLimit=2
server.1=CentOSA:2887:3887 #主从选举服务端口/数据同步端口
server.2=CentOSB:2887:3887
server.3=CentOSC:2887:3887
[[email protected] ~]# mkdir /root/zkdata
[[email protected] ~]# echo 1 > /root/zkdata/myid
[[email protected] ~]# echo 2 > /root/zkdata/myid
[[email protected] ~]# echo 3 > /root/zkdata/myid
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: [follower|leader] --你可以看到CentOSA/B/C中有一个节点是Leader,其他两个是follower
单机版
[[email protected] ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[[email protected] ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000 #监测会话超时2*tickTime
dataDir=/root/zkdata #zookeeper数据目录
clientPort=2181 #服务端口
[[email protected] ~]# mkdir /root/zkdata
[[email protected] zookeeper-3.4.6]# ./bin/zkServer.sh start zoo.cfg #启动
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] zookeeper-3.4.6]# ./bin/zkServer.sh status zoo.cfg #查看状态
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
[[email protected] ~]# jps
1742 QuorumPeerMain
[[email protected] zookeeper-3.4.6]# ./bin/zkServer.sh stop zoo.cfg #关闭
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
zookeeper节点操作命令
[[email protected] zookeeper-3.4.6]# ./bin/zkCli.sh -server CentOS:2181
Connecting to CentOS:2181
[zk: CentOS:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
set path data [version]
ls path [watch]
ls2 path [watch]
delete path [version]
rmr path #递归删除
get path [watch]
create [-s] [-e] path data acl
quit
close
connect host:port #close/connect 关/开连接
[zk: CentOS:2181(CONNECTED) 5] ls /
[baizhi, zookeeper]
[zk: CentOS:2181(CONNECTED) 6] ls2 /
[baizhi, zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
[zk: CentOS:2181(CONNECTED) 7] set /baizhi "nihao"
cZxid = 0x2
ctime = Wed Nov 14 18:50:18 CST 2018
mZxid = 0x3
mtime = Wed Nov 14 18:51:08 CST 2018
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
[zk: CentOS:2181(CONNECTED) 8] get /baizhi
"nihao"
cZxid = 0x2
ctime = Wed Nov 14 18:50:18 CST 2018
mZxid = 0x3
mtime = Wed Nov 14 18:51:08 CST 2018
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
[zk: CentOS:2181(CONNECTED) 9] delete /baizhi
[zk: CentOS:2181(CONNECTED) 10] ls /
[zookeeper]
[zk: CentOS:2181(CONNECTED) 11] create -e /baizhi/enode ''
Node does not exist: /baizhi/enode
[zk: CentOS:2181(CONNECTED) 12] create /baizhi ''
Created /baizhi
[zk: CentOS:2181(CONNECTED) 13] create -e /baizhi/enode ''
Created /baizhi/enode
Java 连接Zookeeper(zkclient|curator-framework)
老版使用zkclient、新版使用curator-framework连接zk的客户端,方便选举,分布式锁
- 依赖
<!--访问zookeeper基础驱动jar包-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.7.1</version>
</dependency>
<!--curator提供额外功能jars 分布式锁、选举等-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.1</version>
</dependency>
<!--其他-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
参考:http://curator.apache.org/getting-started.html
创建客户端
private CuratorFramework client;
@Before
public void before(){
String servers="CentOS:2181";
RetryPolicy retryPolicy=new RetryNTimes(3,1000);
//在10秒内,每间隔1秒尝试一次
new RetryUntilElapsed(10000,1000);
//new ExponentialBackoffRetry(1000,3);尝试三次,每次递增1秒
client = CuratorFrameworkFactory.newClient(servers, retryPolicy);
}
@After
public void after(){
client.close();
}
常规操作
public class CuratorFrameworkDemo {
private CuratorFramework curatorFramework;
@Before
public void before(){
String servers="CentOS:2181";
/**
* RetryPolicy:重试策略
* new RetryNTimes(3,1000) //重试多少次,间隔时长
* new RetryUntilElapsed(10000,1000) //多长时间内,间隔时长重试
* new ExponentialBackoffRetry(1000,10) 增量式,时间间隔重试
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,10);
//new RetryUntilElapsed(10000,1000);
//new RetryNTimes(3,1000);
/**
* 支持丰富的创建方式 还可以工厂流式创建CuratorFrameworkFactory.builder().xxx.build()
*/
curatorFramework= CuratorFrameworkFactory.newClient(servers,
2000
,5000,retryPolicy);//参数:访问服务,会话超时时间,连接超时时间,重试策略
curatorFramework.start();
}
// create //test
@Test
public void testCreate() throws Exception {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)//创建的模式为持久节点、临时节点EPHEMERAL(会话关闭,无效可见)
.forPath("/test",SerializationUtils.serialize(new Date()));
}
@Test
public void testSetData() throws Exception {
curatorFramework.setData()
.inBackground()
.forPath("/test", SerializationUtils.serialize(new Integer(100)));
}
@Test
public void testGetData() throws Exception {
byte[] bytes = curatorFramework.getData()
.forPath("/test");
Object obj = SerializationUtils.deserialize(bytes);
System.out.println(obj.getClass()+" "+obj);
}
@Test
public void testExits() throws Exception {
Stat stat = curatorFramework.checkExists().forPath("/baizhi");
System.out.println(stat==null);
}
@Test
public void testDelete() throws Exception {
curatorFramework.delete()
.deletingChildrenIfNeeded()//等同rmr,递归删除
.forPath("/test");
}
@Test
public void testCreateChild() throws Exception {
curatorFramework.create().creatingParentsIfNeeded().forPath("/test/192.168.0.1:20881","你好".getBytes());
curatorFramework.create().creatingParentsIfNeeded().forPath("/test/192.168.0.2:20881","hello".getBytes());
}
@Test
public void testGetChildren() throws Exception {
List<String> nodes = curatorFramework.getChildren().forPath("/test");
for (String node : nodes) {
System.out.println(node);
}
}
/**
* 节点检测,检测子节点变化【重要】
*/
@Test
public void testChildNodeChange() throws Exception {
PathChildrenCache pcc=new PathChildrenCache(curatorFramework,"/test",true);
pcc.start();
//监测子节点变更
pcc.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
/**
* .getType() //事件类型(添加CHILD_ADDED 或 删除CHILD_REMOVED)
*/
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
System.out.println("事件类型:"+type.name());
ChildData data = pathChildrenCacheEvent.getData();
System.out.println("数据信息:"+data.getPath()+" "+new String(data.getData()));
}
});
System.in.read();//保证main函数不退出,模拟挂起阻塞,不然关闭的化看不到临时节点的那些状态
pcc.close();
}
/**
* 集中配置【重要】
*/
@Test
public void testNodeDataChange() throws Exception {
NodeCache nc=new NodeCache(curatorFramework,"/test");
nc.start();
nc.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if(curatorFramework.checkExists().forPath("test")!=null){
byte[] bytes = curatorFramework.getData().forPath("/test");
System.out.println("数据变化了:"+SerializationUtils.deserialize(bytes));
}else{
System.out.println("节点被删除了...");
}
}
});
System.in.read();
nc.close();
}
`
- zookeeper分布式锁【面试题】
A释放锁,B才可能拿到锁。跨进程
高并发,例如抢购场景:集群服务修改变量,服务公用一个变量;为了保证只有一个服务处理变量,加上分布式锁保证。
Redis分布式锁:利用setnx设置锁,只有del才能解开锁并再设置锁。Zookeeper相比Redis分布式锁的优势?【面试题】
zookeeper可以保证临界资源安全的同时,还能保证顺序
;其他redis和synchronized都不能保证顺序。
/**
* 分布式锁【重要】
*/
@Test
public void testDistributeLock() throws Exception {
System.out.println("服务b");
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/com.test.service.IUserSevice#updateUser1");//锁标记,动态获取类名或方法
if ( lock.acquire(1, TimeUnit.MINUTES) ) {//参数:最多等待时间,单位
try {
System.out.println("获取锁...");
Thread.sleep(10000);
System.out.println("更新临界资源");
} finally {
System.out.println("释放锁...");
lock.release();
}
}
}
-
zookeeper选举【了解】
思路:会话断开,建立节点;比较节点大小,最小的节点调用leadership方法,没调用的节点成为follower。@Test public void testElection() throws IOException { LeaderSelectorListener listener = new LeaderSelectorListenerAdapter(){ /** * 方法中,接管老大的代码块,若归还代码块就不是leader */ public void takeLeadership(CuratorFramework client) throws Exception { // this callback will get called when you are the leader // do whatever leader work you need to and only exit // this method when you want to relinquish leadership System.out.println("我是LeaderB"); Thread.sleep(new Random().nextInt(1000)); System.out.println("我要退出了,请其他节点自己选举新的主机"); } }; LeaderSelector selector = new LeaderSelector(curatorFramework, "/hadoop-ha", listener); selector.autoRequeue(); // not required, but this is behavior that you will probably expect selector.start(); System.in.read(); } @After public void after(){ //关闭会话 curatorFramework.close(); } }