要使用zookeeper原生的api,需引入下面的jar包:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.2</version> </dependency>ZooKeeper构造方法参数说明:
connectString:多个zookeeper IP地址用逗号分隔,也可以在后面带上节点,这样后续对节点的所有操作都是在这个节点之下,例如connectString为10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx,然后使用create /app,这样创建出来的节点的绝对路径为/xxxx/app,这样操作一定要确保/xxxx先存在,否则会报错。sessionTimeout:session超时时间,单位为ms。watcher:监听连接的状态,代码中使用了同步计数器CountDownLatch,因为new Zookeeper创建对象立马就会返回了,而客户端连接到服务端是耗时的,这个时候并没有真正的连接成功,如果这个时候拿zk客户端对象去做操作会报错,所以要等待连接建立成功的时候才能使用客户端对象。节点创建成功后会返回创建节点的真实路径,如果是非顺序节点,那么这个路径就是传入的路径,如果是顺序节点,那么这个路径就是传入的路径+序号。
创建一个已经存在的节点会报错:NodeExistsException,由此可见不能重复创建节点。
zooKeeper.create("/app1", "xxoo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);运行结果如下:
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /app1 at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ... ...创建一个父节点不存在的节点会报错:NoNodeException,由此可见不会自动递归创建节点。
zooKeeper.create("/a/b", "bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);运行结果如下:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /a/b at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1734) ... ...带回调的异步创建节点的方式:
zooKeeper.create("/app5", "app5".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> { log.info("create /app5:{}", name); }, null);带回调的异步获取节点数据的方式:
zooKeeper.getData("/app1", null, (rc, path, ctx, data, stat) -> { log.info("/app1 data:{}", new String(data)); }, null);删除一个存在子节点的节点会报错:NotEmptyException,由此可见不会自动递归删除节点。
运行结果如下:
2020-10-20 14:41:46,054 INFO [main] (ZookeeperDemo.java:164) - /a children:[b, c, d]getChildren只会返回所有子节点的集合,不包含孙子节点。
在创建zookeeper会话时会传入一个Watcher,里面有两个特别关键的类:KeeperState(连接状态)和EventType(事件类型)。
KeeperState表示的是客户端与服务端连接的状态。
连接状态描述Disconnected客户端与服务器断开连接SyncConnected客户端与服务器建立连接AuthFailed客户端进行连接认证失败ConnectedReadOnly客户端连接到的zookeeper服务是只读的SaslAuthenticated用于通知客户端它们是SASL认证的Expired客户端心跳检测没有收到服务端的响应时即认定断开连接,session失效EventType表示的是节点发生变化时所触发的事件类型。
事件类型描述NodeCreated被监听的节点被创建NodeChildrenChanged被监听的节点的直接子节点被创建、被删除、子节点数据发生变更NodeDataChanged被监听的节点的数据发生变更NodeDeleted被监听的节点被删除None客户端的连接状态(KeeperState)发生变更zookeeper中给节点添加watcher的方式有两种:
使用默认的watcher: List<String> getChildren(String path, boolean watch) byte[] getData(String path, boolean watch, Stat stat) Stat exists(String path, boolean watch)上面三个方法中都有一个boolean类型的watch参数,当watch==true时使用的就是默认的watcher,而默认的watcher就是创建连接的构造方法中的watcher,也可以通过register(Watcher watcher)注册默认的watcher。
给节点指定的watcher: List<String> getChildren(String path, Watcher watcher) byte[] getData(String path, Watcher watcher, Stat stat) Stat exists(String path, Watcher watcher)上面三个方法中都有个Watcher类型的watch参数,可以通过传递这个参数给节点指定watcher。
创建会话并注入一个默认的Watcher:
private ZooKeeper zookeeper; private CountDownLatch countDownLatch = new CountDownLatch(1); @Before public void getZkClient() throws InterruptedException, IOException { if (null == zookeeper) { zookeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx", 6000, new MyWatcher("Default Watcher")); countDownLatch.await(); } } class MyWatcher implements Watcher { private String name; public MyWatcher(String name) { this.name = name; } public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); //获取事件的状态 Event.EventType type = watchedEvent.getType(); //获取事件的类型 if (Event.KeeperState.SyncConnected.equals(state)) { switch (type) { case None: log.info("zookeeper connected success"); countDownLatch.countDown(); break; case NodeCreated: log.info("[{}] create node: {}", name, watchedEvent.getPath()); break; case NodeDeleted: log.info("[{}] delete node: {}", name, watchedEvent.getPath()); break; case NodeDataChanged: log.info("[{}] node data change: {}", name, watchedEvent.getPath()); break; case NodeChildrenChanged: log.info("[{}] node children change: {}", name, watchedEvent.getPath()); break; } } } }exists使用默认的watcher:
zookeeper.exists("/p", true); zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.exists("/p", true); zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.exists("/p", true); zookeeper.setData("/p", "pp".getBytes(), -1); zookeeper.exists("/p", true); zookeeper.delete("/p/c", -1); zookeeper.exists("/p", true); zookeeper.delete("/p", -1);运行结果如下:
2020-10-21 09:46:30,653 INFO [main-EventThread] (ZookeeperWatcherDemo.java:42) - zookeeper connected success 2020-10-21 09:46:30,670 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p 2020-10-21 09:46:30,681 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher}] node data change: /p 2020-10-21 09:46:30,688 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p从运行结果可以发现exists注册的watcher能监听节点的事件为:NodeCreated、NodeDataChanged、NodeDeleted。
getData使用默认的watcher:
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getData("/p", true, null); zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getData("/p", true, null); zookeeper.setData("/p", "pp".getBytes(), -1); zookeeper.getData("/p", true, null); zookeeper.delete("/p/c", -1); zookeeper.getData("/p", true, null); zookeeper.delete("/p", -1);运行结果如下:
2020-10-21 09:58:51,948 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p 2020-10-21 09:58:51,956 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p从运行结果可以发现getData注册watcher能监听节点的事件为:NodeDeleted、NodeDataChanged,不能监听NodeCreated事件(getData一个不存在的节点会抛出异常NoNodeException)。
getChildren使用默认的watcher:
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getChildren("/p", true); zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zookeeper.getChildren("/p", true); zookeeper.setData("/p", "pp".getBytes(), -1); zookeeper.getChildren("/p", true); zookeeper.delete("/p/c", -1); zookeeper.getChildren("/p", true); zookeeper.delete("/p", -1);运行结果如下:
2020-10-21 10:06:03,887 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p 2020-10-21 10:06:03,899 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p 2020-10-21 10:06:03,905 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p从运行结果可以发现getChildren注册watcher能监听节点的事件为:NodeDeleted、NodeChildrenChanged。
运行结果如下:
2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p 2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Customer Watcher] create node: /p结论:当一个节点注册了多个watcher,那么多个watcher的方法都会被回调。
运行结果如下:
2020-10-21 10:22:41,025 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p结论:当一个节点用不同的方法都注册了同一个watcher时,watcher的方法只会回调一次。
不同注册watcher的方法与可监听事件的关系:
注册方式NodeCreatedNodeChildrenChangedNodeDataChangedNodeDeletedgetChildrenYYexistsYYYgetDataYY通过观察运行结果,总结如下:
注册一次watcher只会收到一次通知,想一直监听就得收到通知后再次注册。同一个watcher实例被例如exists,getData等方法多次注册,zookeeper客户端也只会收到一次通知。当一个节点注册多个不同的watcher实例时,会通知多次,即每个被注册的watcher都会收到通知。exists可以监听一个不存在的节点,但是getData和getChildren不能监控一个不存在的节点,否则会报NoNodeException。zookeeper原生客户端的缺点
不能递归的创建节点和删除节点。对节点的数据操作基于字节数组(二进制安全),经常需要数组和字符串之间的转换。想一直监听某个节点就要一直注册,监听一些不存在的节点可能会抛出异常。