zookeeper原生客户端api的使用

it2024-03-17  59

zookeeper原生api的使用

要使用zookeeper原生的api,需引入下面的jar包:

<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.2</version> </dependency>

基本操作

创建会话

package com.morris.zookeeper.zookeeper; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; @Slf4j public abstract class ZookeeperFactory { public static ZooKeeper create() throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); final ZooKeeper zooKeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182", 6000, new Watcher() { public void process(WatchedEvent event) { if(event.getState().equals(Event.KeeperState.SyncConnected)) { log.info("connected success"); countDownLatch.countDown(); } } }); countDownLatch.await(); return zooKeeper; } }

ZooKeeper构造方法参数说明:

connectString:多个zookeeper IP地址用逗号分隔,也可以在后面带上节点,这样后续对节点的所有操作都是在这个节点之下,例如connectString为10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx,然后使用create /app,这样创建出来的节点的绝对路径为/xxxx/app,这样操作一定要确保/xxxx先存在,否则会报错。sessionTimeout:session超时时间,单位为ms。watcher:监听连接的状态,代码中使用了同步计数器CountDownLatch,因为new Zookeeper创建对象立马就会返回了,而客户端连接到服务端是耗时的,这个时候并没有真正的连接成功,如果这个时候拿zk客户端对象去做操作会报错,所以要等待连接建立成功的时候才能使用客户端对象。

创建节点

String app1 = zooKeeper.create("/app1", "app1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 创建持久节点 String app2 = zooKeeper.create("/app2", "app2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // 创建持久顺序节点 String app3 = zooKeeper.create("/app3", "app3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 创建临时节点 String app4 = zooKeeper.create("/app4", "app4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时顺序节点

节点创建成功后会返回创建节点的真实路径,如果是非顺序节点,那么这个路径就是传入的路径,如果是顺序节点,那么这个路径就是传入的路径+序号。

创建一个已经存在的节点会报错:NodeExistsException,由此可见不能重复创建节点。

zooKeeper.create("/app1", "xxoo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

运行结果如下:

org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /app1 at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ... ...

创建一个父节点不存在的节点会报错:NoNodeException,由此可见不会自动递归创建节点。

zooKeeper.create("/a/b", "bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

运行结果如下:

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /a/b at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1734) ... ...

带回调的异步创建节点的方式:

zooKeeper.create("/app5", "app5".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> { log.info("create /app5:{}", name); }, null);

获取节点数据

byte[] data = zooKeeper.getData("/app1", null, null); log.info("/app1 data:{}", new String(data));

带回调的异步获取节点数据的方式:

zooKeeper.getData("/app1", null, (rc, path, ctx, data, stat) -> { log.info("/app1 data:{}", new String(data)); }, null);

更新节点数据

zooKeeper.setData("/app1", "morris".getBytes(), -1);

判断节点是否存在

Stat stat = zooKeeper.exists("/app1", false); log.info("/app1 stat: {}", stat); // 8589934605,8589934622,1603164678863,1603165733893,1,0,0,0,6,0,8589934605 Stat s = zooKeeper.exists("/ooxx", false); log.info("/ooxx stat: {}", s); // null 节点不存在返回null

删除节点

zooKeeper.delete("/app1", -1);

删除一个存在子节点的节点会报错:NotEmptyException,由此可见不会自动递归删除节点。

获取子节点数据

zooKeeper.create("/a", "a".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create("/a/b", "ab".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create("/a/c", "ac".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create("/a/d", "ad".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create("/a/b/x", "abx".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> children = zooKeeper.getChildren("/a", false); log.info("/a children:{}", children);

运行结果如下:

2020-10-20 14:41:46,054 INFO [main] (ZookeeperDemo.java:164) - /a children:[b, c, d]

getChildren只会返回所有子节点的集合,不包含孙子节点。

实现递归创建节点

private void create(String path, String data) throws KeeperException, InterruptedException { String[] split = path.split("/"); StringBuilder p = new StringBuilder(); for (int i = 1; i < split.length - 1; i++) { p.append("/").append(split[i]); if(null == zookeeper.exists(p.toString(), null)) { zookeeper.create(p.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }

实现递归删除节点

private void delete(String path) throws KeeperException, InterruptedException { List<String> children = zookeeper.getChildren(path, false); for (String child : children) { delete(path + "/" + child); } zookeeper.delete(path, -1); }

Watcher监听机制

在创建zookeeper会话时会传入一个Watcher,里面有两个特别关键的类:KeeperState(连接状态)和EventType(事件类型)。

KeeperState

KeeperState表示的是客户端与服务端连接的状态。

连接状态描述Disconnected客户端与服务器断开连接SyncConnected客户端与服务器建立连接AuthFailed客户端进行连接认证失败ConnectedReadOnly客户端连接到的zookeeper服务是只读的SaslAuthenticated用于通知客户端它们是SASL认证的Expired客户端心跳检测没有收到服务端的响应时即认定断开连接,session失效

EventType

EventType表示的是节点发生变化时所触发的事件类型。

事件类型描述NodeCreated被监听的节点被创建NodeChildrenChanged被监听的节点的直接子节点被创建、被删除、子节点数据发生变更NodeDataChanged被监听的节点的数据发生变更NodeDeleted被监听的节点被删除None客户端的连接状态(KeeperState)发生变更

zookeeper中的watcher

zookeeper中给节点添加watcher的方式有两种:

使用默认的watcher: List<String> getChildren(String path, boolean watch) byte[] getData(String path, boolean watch, Stat stat) Stat exists(String path, boolean watch)

上面三个方法中都有一个boolean类型的watch参数,当watch==true时使用的就是默认的watcher,而默认的watcher就是创建连接的构造方法中的watcher,也可以通过register(Watcher watcher)注册默认的watcher。

给节点指定的watcher: List<String> getChildren(String path, Watcher watcher) byte[] getData(String path, Watcher watcher, Stat stat) Stat exists(String path, Watcher watcher)

上面三个方法中都有个Watcher类型的watch参数,可以通过传递这个参数给节点指定watcher。

watcher测试

创建会话并注入一个默认的Watcher:

private ZooKeeper zookeeper; private CountDownLatch countDownLatch = new CountDownLatch(1); @Before public void getZkClient() throws InterruptedException, IOException { if (null == zookeeper) { zookeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx", 6000, new MyWatcher("Default Watcher")); countDownLatch.await(); } } class MyWatcher implements Watcher { private String name; public MyWatcher(String name) { this.name = name; } public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); //获取事件的状态 Event.EventType type = watchedEvent.getType(); //获取事件的类型 if (Event.KeeperState.SyncConnected.equals(state)) { switch (type) { case None: log.info("zookeeper connected success"); countDownLatch.countDown(); break; case NodeCreated: log.info("[{}] create node: {}", name, watchedEvent.getPath()); break; case NodeDeleted: log.info("[{}] delete node: {}", name, watchedEvent.getPath()); break; case NodeDataChanged: log.info("[{}] node data change: {}", name, watchedEvent.getPath()); break; case NodeChildrenChanged: log.info("[{}] node children change: {}", name, watchedEvent.getPath()); break; } } } }

watcher

exists使用默认的watcher:

zookeeper.exists("/p", true); zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.exists("/p", true); zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.exists("/p", true); zookeeper.setData("/p", "pp".getBytes(), -1); zookeeper.exists("/p", true); zookeeper.delete("/p/c", -1); zookeeper.exists("/p", true); zookeeper.delete("/p", -1);

运行结果如下:

2020-10-21 09:46:30,653 INFO [main-EventThread] (ZookeeperWatcherDemo.java:42) - zookeeper connected success 2020-10-21 09:46:30,670 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p 2020-10-21 09:46:30,681 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher}] node data change: /p 2020-10-21 09:46:30,688 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p

从运行结果可以发现exists注册的watcher能监听节点的事件为:NodeCreated、NodeDataChanged、NodeDeleted。

getData

getData使用默认的watcher:

zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getData("/p", true, null); zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getData("/p", true, null); zookeeper.setData("/p", "pp".getBytes(), -1); zookeeper.getData("/p", true, null); zookeeper.delete("/p/c", -1); zookeeper.getData("/p", true, null); zookeeper.delete("/p", -1);

运行结果如下:

2020-10-21 09:58:51,948 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p 2020-10-21 09:58:51,956 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p

从运行结果可以发现getData注册watcher能监听节点的事件为:NodeDeleted、NodeDataChanged,不能监听NodeCreated事件(getData一个不存在的节点会抛出异常NoNodeException)。

getChildren

getChildren使用默认的watcher:

zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getChildren("/p", true); zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getChildren("/p", true); zookeeper.setData("/p", "pp".getBytes(), -1); zookeeper.getChildren("/p", true); zookeeper.delete("/p/c", -1); zookeeper.getChildren("/p", true); zookeeper.delete("/p", -1);

运行结果如下:

2020-10-21 10:06:03,887 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p 2020-10-21 10:06:03,899 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p 2020-10-21 10:06:03,905 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p

从运行结果可以发现getChildren注册watcher能监听节点的事件为:NodeDeleted、NodeChildrenChanged。

用同一个方法注册多个watcher

zookeeper.exists("/p", true); // 注册默认的wachter zookeeper.exists("/p", new MyWatcher("Customer Watcher")); // 再注册一个watcher zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.delete("/p", -1);

运行结果如下:

2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p 2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Customer Watcher] create node: /p

结论:当一个节点注册了多个watcher,那么多个watcher的方法都会被回调。

用不同的方法注册同一个watcher多次

zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getData("/p", true, null); // 注册默认的wachter zookeeper.exists("/p", true); // 注册两次默认的wachter zookeeper.setData("/p", "ppp".getBytes(), -1); zookeeper.delete("/p", -1);

运行结果如下:

2020-10-21 10:22:41,025 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p

结论:当一个节点用不同的方法都注册了同一个watcher时,watcher的方法只会回调一次。

总结

不同注册watcher的方法与可监听事件的关系:

注册方式NodeCreatedNodeChildrenChangedNodeDataChangedNodeDeletedgetChildrenYYexistsYYYgetDataYY

通过观察运行结果,总结如下:

注册一次watcher只会收到一次通知,想一直监听就得收到通知后再次注册。同一个watcher实例被例如exists,getData等方法多次注册,zookeeper客户端也只会收到一次通知。当一个节点注册多个不同的watcher实例时,会通知多次,即每个被注册的watcher都会收到通知。exists可以监听一个不存在的节点,但是getData和getChildren不能监控一个不存在的节点,否则会报NoNodeException。

zookeeper原生客户端的缺点

不能递归的创建节点和删除节点。对节点的数据操作基于字节数组(二进制安全),经常需要数组和字符串之间的转换。想一直监听某个节点就要一直注册,监听一些不存在的节点可能会抛出异常。
最新回复(0)