watcher网上
watcher解决的问题
在进入watcher之前我们先试想在应用服务器集群中可能存在的两个问题:
因为集群中有很多机器,当某个通用的配置发生变化后,怎么让自动的让所有服务器的配置统一生效?当集群中某个节点宕机,如何让集群中的其他节点知道?
为了解决这两个问题,zookeeper引入了watcher机制来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。 watcher基本原理
zookeeper中实现watcher需要有三个部分,如下图所示:
分别是zookeeper服务端、客户端以及客户端的watchManager。 如图所示,客户端向zk注册watcher的同时,会将客户端的watcher对象存储在客户端的WatchManager中;zk服务器触发watch事件后,会向客户端发送通知,客户端线程从watchManager中取出对应watcher执行。
客户端如何实现事件通知的动作
客户端只需定义一个类实现org.apache.zookeeper.Watcher接口并实现接口中的如下方法:
abstract public void process(WatchedEvent event
);
即可在得到通知后执行相应的动作。参数org.apache.zookeeper.WatchedEvent是zk服务端传过来的事件,有三个成员:
final private KeeperState keeperState
;
final private EventType eventType
;
private String path
;
keeperState是个枚举对象,代表客户端和zk服务器的链接状态;eventType也是个枚举类型,代表节点发生的事件类型,比如创建新的子节点、改变节点数据等;
对于NodeDataChanged事件:无论节点数据发生变化还是数据版本发生变化都会触发(即使被更新数据与新数据一样,数据版本都会发生变化)。 对于NodeChildrenChanged事件:新增和删除子节点会触发该事件类型。 需要注意的是:WatchedEvent只是事件相关的通知,并没有对应数据节点的原始数据内容及变更后的新数据内容,因此如果需要知道变更前的数据或变更后的新数据,需要业务保存变更前的数据和调用接口获取新的数据
如何注册watcher
watcher注册api
可以在创建zk客户端实例的时候注册watcher(构造方法中注册watcher):
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
,ZKClientConfig conf
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, boolean canBeReadOnly
, HostProvider aHostProvider
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, boolean canBeReadOnly
, HostProvider aHostProvider
,ZKClientConfig clientConfig
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, boolean canBeReadOnly
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, boolean canBeReadOnly
, ZKClientConfig conf
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, long sessionId
, byte[] sessionPasswd
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, long sessionId
, byte[] sessionPasswd
, boolean canBeReadOnly
, HostProvider aHostProvider
)
public ZooKeeper(String connectString
, int sessionTimeout
, Watcher watcher
, long sessionId
, byte[] sessionPasswd
, boolean canBeReadOnly
)
ZooKeeper的构造方法中传入的watcher将会作为整个zk会话期间的默认watcher,该watcher会一直保存为客户端ZKWatchManager的defaultWatcher成员,如果有其他的设置,这个watcher会被覆盖。
除了可以通过ZooKeeper类的构造方法注册watcher外,还可以通过ZooKeeper类中其他一些api来注册watcher,只不过这些api注册的watcher就不是默认watcher了(以下每个注册watcher的方法有很多个重载的方法,就不一一列举出来)。
案例
public class ZookeeperServer implements Watcher {
private static Log log
= LogFactory
.getLog(ZookeeperServer
.class);
private static final String ZK_ADDRESS
= ZookeeperConfig
.getAddress();
private static final String ZK_NODE_PATH
= ZookeeperConfig
.getZKNodePath();
private static final String ZK_CODE_PATH
= ZK_NODE_PATH
+ ZookeeperConfig
.getZKCodePath();
private static final String ZK_IP_MODEL_PATH
= ZK_NODE_PATH
+ ZookeeperConfig
.getZKIPModelPath();
private static final int SESSION_TIMEOUT
= ZookeeperConfig
.getSessionTimeout();
private ZooKeeper zk
= null
;
private Stat stat
= new Stat();
private CountDownLatch countDownLatch
= new CountDownLatch(1);
private static volatile ZookeeperServer instance
= null
;
private ZookeeperServer() {
}
public static ZookeeperServer
getInstance() {
if (instance
== null
) {
synchronized (ZookeeperServer
.class) {
if (instance
== null
) {
instance
= new ZookeeperServer();
try {
instance
.initZookeeperServer();
} catch (IOException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
} catch (KeeperException e
) {
e
.printStackTrace();
}
}
}
}
return instance
;
}
private static Map
<String, EsbGateway> gatewayMap
= new HashMap<>();
public static Map
<String, EsbGateway> getEsbGatewayMap() {
return gatewayMap
;
}
public static EsbGateway
getGateway(String gatewayCode
) {
EsbGateway gateway
= gatewayMap
.get(gatewayCode
);
if (gateway
!= null
) {
return gateway
;
}
return null
;
}
private static Map
<String, IpStrategy> ipStrategMap
= new HashMap<>();
public static Map
<String, IpStrategy> getIpStrategyMap() {
return ipStrategMap
;
}
private static Map
<String, IpModel> ipModelMap
= new HashMap<>();
public static Map
<String, IpModel> getIpModelMap() {
return ipModelMap
;
}
public void initZookeeperServer() throws IOException
, InterruptedException
, KeeperException
{
log
.info("初始化zookeeper------begin");
log
.info("初始化zookeeper地址为------" + ZK_ADDRESS
);
log
.info("初始化zookeeper节点路径为------" + ZK_NODE_PATH
);
log
.info("初始化zookeeper会话超时时间为------" + SESSION_TIMEOUT
);
zk
= new ZooKeeper(ZK_ADDRESS
, SESSION_TIMEOUT
, this);
countDownLatch
.await();
initZookeeperData();
log
.info("初始化zookeeper------end");
}
public void initZookeeperData() {
log
.info("zookeeper初始化数据");
ipStrategMap
= IpService
.getStrategyListMap();
ipModelMap
= IpService
.getipModelsMap();
gatewayMap
= GatewayService
.getAllGatewayMap();
try {
Gson gson
= new Gson();
String gatewaycode
= gson
.toJson(gatewayMap
);
String ipModel
= gson
.toJson(ipModelMap
);
String ipStrateg
= gson
.toJson(ipStrategMap
);
createZkData(ZK_NODE_PATH
, "");
createZkData(ZK_CODE_PATH
, gatewaycode
);
createZkData(ZK_IP_MODEL_PATH
, ipModel
);
} catch (Exception e
) {
e
.printStackTrace();
}
}
public void createZkData(String path
, String data
) throws IOException
, InterruptedException
, KeeperException
{
Stat stat
= zk
.exists(path
, true);
if (stat
== null
) {
zk
.create(path
, data
.getBytes(), ZooDefs
.Ids
.OPEN_ACL_UNSAFE
,
CreateMode
.PERSISTENT
);
} else {
zk
.setData(path
, data
.getBytes(), -1);
}
}
@SuppressWarnings("unchecked")
@Override
public void process(WatchedEvent event
) {
Event
.KeeperState keeperState
= event
.getState();
Event
.EventType eventType
= event
.getType();
if (Event
.KeeperState
.SyncConnected
== keeperState
) {
if (Event
.EventType
.None
== eventType
) {
countDownLatch
.countDown();
log
.info("zookeeperServer启动连接。。。");
} else if (eventType
== Event
.EventType
.NodeDataChanged
) {
try {
Gson gson
= new Gson();
if (event
.getPath().equals(ZK_CODE_PATH
)) {
String gateway
= new String(zk
.getData(event
.getPath(), true, stat
));
log
.info("zookeeperServer微服务信息发生变化");
Type type
= new TypeToken<Map
<String, EsbGateway>>() {
}.getType();
gatewayMap
= gson
.fromJson(gateway
, type
);
} else if (event
.getPath().equals(ZK_IP_MODEL_PATH
)) {
String ipModel
= new String(zk
.getData(event
.getPath(), true, stat
));
log
.info("zookeeperServer监听微服务ip策略发生变化");
Type type
= new TypeToken<Map
<String, IpModel>>() {
}.getType();
ipModelMap
= gson
.fromJson(ipModel
, type
);
}
} catch (Exception e
) {
e
.printStackTrace();
log
.error("zookeeperServer监听数据异常:" + ExceptionUtil
.getExceptionInfo(e
));
}
}
} else if (keeperState
== Event
.KeeperState
.Expired
) {
try {
this.zk
.close();
this.zk
= new ZooKeeper(ZK_ADDRESS
, SESSION_TIMEOUT
, this);
log
.info("zookeeperServer重新连接");
} catch (Exception e
) {
log
.error("zookeeperServer连接失败:" + ExceptionUtil
.getExceptionInfo(e
));
}
}
}
}