大数据之Zookeeper(六)-Zookeeper的API应用&监听服务器节点动态上下线案例

it2025-09-11  7

ZookeeperAPI应用

1、创建一个Maven工程

2、添加pom文件

<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> </dependencies>

3、需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入下面代码

log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4、创建Zookeeper客户端

package com.zhukun.Zookeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; import java.io.IOException; public class TestZookeeper { /* * 查看端口号:cd conf * vim zoo.cfg*/ private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeout = 2000; private ZooKeeper zkClient = null; @Test public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 收到事件通知后的回调函数(用户的业务逻辑) System.out.println(event.getType() + "--" + event.getPath()); // 再次启动监听 try { zkClient.getChildren("/", true); } catch (Exception e) { e.printStackTrace(); } } }); } }

测试: 先群起集群的Zookeeper服务器: 运行代码控制台输出如下,说明客户端连接服务器成功

5、创建子节点

// 创建子节点 @Test public void create() throws Exception { // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型 String nodeCreated = zkClient.create("/zhukun", "dashuaige".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("成功创建子节点"); }

运行测试: Linux系统shell命令查看是否成功创建子节点

6、获取子节点并监听节点变化

// 获取子节点 @Test public void getChildren() throws Exception { List<String> children = zkClient.getChildren("/", true); for (String child : children) { System.out.println(child); } // 延时阻塞 Thread.sleep(Long.MAX_VALUE); }

测试: Zookeeper服务器原有节点 运行代码: 对应没问题,再用shell命令创建子节点,查看代码运行程序是否监听 成功监听到节点的变化

7、判断节点是否存在

// 判断znode是否存在 @Test public void exist() throws Exception { //判断根目录下是否有zk这个节点 Stat stat = zkClient.exists("/zhukun", false); System.out.println(stat == null ? "not exist" : "exist"); }

运行测试: 实际上存在zk节点 控制台输出 shell命令删除zk节点再运行

监听服务器节点动态上下线

1、需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

2、需求分析

3、具体实现

1、先在集群上创建/servers节点 2、服务器端向Zookeeper注册代码

package com.zhukun.zkcase; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; public class DistributeServer { private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/servers"; // 创建到zk的客户端连接 public void getConnect() throws IOException{ zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } // 注册服务器 public void registServer(String hostname) throws Exception{ String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname +" is online "+ create); } // 业务功能 public void business(String hostname) throws Exception{ System.out.println(hostname+" is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 1 获取zk连接 DistributeServer server = new DistributeServer(); server.getConnect(); // 2 利用zk连接注册服务器信息 server.registServer(args[0]); // 3 启动业务功能 server.business(args[0]); } }

客户端代码:

package com.zhukun.zkcase; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class DistributeClient { private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/servers"; // 创建到zk的客户端连接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 再次启动监听 try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } // 获取服务器列表信息 public void getServerList() throws Exception { // 1获取服务器子节点信息,并且对父节点进行监听 List<String> children = zk.getChildren(parentNode, true); // 2存储服务器信息列表 ArrayList<String> servers = new ArrayList<String>(); // 3遍历所有节点,获取节点中的主机名称信息 for (String child : children) { byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } // 4打印服务器列表信息 System.out.println(servers); } // 业务功能 public void business() throws Exception{ System.out.println("client is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 1获取zk连接 DistributeClient client = new DistributeClient(); client.getConnect(); // 2获取servers的子节点信息,从中获取服务器信息列表 client.getServerList(); // 3业务进程启动 client.business(); } }

测试: 客户端运行: 此时没有节点: shell命令创建,观察控制台变化

服务器端测试: 运行的时候给main()函数传参

最新回复(0)