Watcher--数据变更的通知

it2025-04-27  18

Zookeeper提供了分布式数据的发布/订阅功能,Zookeeper引入了Watcher机制来实现分布式的通知功能,Zookeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,则会通知客户端进行相应的处理,Watcher机制概述如下图:

从上图可以看出,客户端在向Zookeeper注册Watcher时会同步把该Watcher存储在WatcherManger中,当Zookeeper服务器端触发Watcher时,会通知客户端,然后从WatcherManger中取出对应的Watcher执行相应的回调逻辑。

Watcher接口

Watcher接口用于表示一个标准的事件处理器,里面就一个抽象方法

abstract public void process(WatchedEvent event);

WatchedEvent

/** * A WatchedEvent represents a change on the ZooKeeper that a Watcher * is able to respond to. The WatchedEvent includes exactly what happened, * the current state of the ZooKeeper, and the path of the znode that * was involved in the event. */ public class WatchedEvent { //事件触发时Zookeeper的状态 final private KeeperState keeperState; //触发的事件类型 final private EventType eventType; //事件涉及到的节点路径 private String path; //省略一些方法 /** * Convert WatchedEvent to type that can be sent over network */ public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } }

WatchedEvent和WatcherEvent的区别和联系

本质上讲两者是同一个事物,都是对一个服务端事件的封装,区别是WatchedEvent是一个逻辑事件,用于客户端与服务端执行过程中的逻辑对象,而WatcherEvent实现了序列化接口,可以用于网络传输,WatchedEvent类提供了getWapper方法把WatchedEvent转换为WatcherEvent。

不管是WatcherEvent,还是WatchedEvent,其对Zookeeper服务端事件的封装是及其简单的,只会包含事件触发时服务端的状态,具体事件类型,涉及到的节点的路径。

如:KeeperState:SyncConnected 

       EventType:NodeChanged

       Path:/test

至于节点的数据由什么变为什么,需要重新加载一次,从Watcher通知里面是无法得知的。

KeeperState,事件发生时Zookeeper的状态

/** * Enumeration of states the ZooKeeper may be at the event */ public enum KeeperState { /** Unused, this state is never generated by the server */ @Deprecated Unknown (-1), /** The client is in the disconnected state - it is not connected * to any server in the ensemble. */ Disconnected (0), /** Unused, this state is never generated by the server */ @Deprecated NoSyncConnected (1), /** The client is in the connected state - it is connected * to a server in the ensemble (one of the servers specified * in the host connection parameter during ZooKeeper client * creation). */ SyncConnected (3), /** * Auth failed state */ AuthFailed (4), /** * The client is connected to a read-only server, that is the * server which is not currently connected to the majority. * The only operations allowed after receiving this state is * read operations. * This state is generated for read-only clients only since * read/write clients aren't allowed to connect to r/o servers. */ ConnectedReadOnly (5), /** * SaslAuthenticated: used to notify clients that they are SASL-authenticated, * so that they can perform Zookeeper actions with their SASL-authorized permissions. */ SaslAuthenticated(6), /** The serving cluster has expired this session. The ZooKeeper * client connection (the session) is no longer valid. You must * create a new client connection (instantiate a new ZooKeeper * instance) if you with to access the ensemble. */ Expired (-112); } EventType:Zookeeper上事件的类型 /** * Enumeration of types of events that may occur on the ZooKeeper */ public enum EventType { None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4); }

注意NodeDataChanged表示dataVersion的表更。NodeChildrenChanged表示的节点的子节点列表发生变更,即新增或者删除子节点,但是子节点的数据发生变化是不会触发的。

客户端是如何注册Watcher的?

创建一个Zookeeper时可以向构造函数中传入一个默认的Watcher,构造函数如下(省略其中一些构造函数)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { //设置默认的Watcher watchManager.defaultWatcher = watcher; //地址解析器解析地址 ConnectStringParser connectStringParser = new ConnectStringParser(connectString); //记录Zookeeper地址相关的信息 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); //管理着客户端的socket i/o cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }

设置默认的watcher到ZKWatcherManger的defaultWatcher中,除了通过这种方式注册watcher之外,还可以通过 getData、exists和 getChildren三个接口进行注册Watcher。采用哪种方式注册逻辑都是一样的,此处使用getData进行说明。

getData接口的定义如下(去除了抛出的异常等信息)

public byte[] getData(String path, boolean watch, Stat stat); public byte[] getData(final String path, Watcher watcher, Stat stat);

在调用getData接口注册Watcher时详细逻辑如下:

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; //验证地址的合法性 PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } //解析获得服务端地址,主要是前缀处理 final String serverPath = prependChroot(clientPath); //请求头 RequestHeader h = new RequestHeader(); //设置类型 h.setType(ZooDefs.OpCode.getData); //设置请求使用的协议,后续会进行介绍 GetDataRequest request = new GetDataRequest(); //设置服务端路径 request.setPath(serverPath); //设置是否注册watcher,只是打一个标识,并没有真正的传递Watcher request.setWatch(watcher != null); //响应体 GetDataResponse response = new GetDataResponse(); //获得响应头,通过网络调用获得结果并进行后续的处理 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }

从上面的代码中可以看出,最终的调用就是

//Zookeeper中的调用处 ReplyHeader r = cnxn.submitRequest(h,request,response,wcb); //ClientCnxn中的方法submitRequest public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; } //Packet中的queuePacket方法 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; //此处还没有为Packet生成Xid,在发送的时候生成,在接口ClientCnxnSocket::doIO() synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } //加入发送队列中等待发送 outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }

在Zookeeper中,Packet可以被看做一个最小的通信协议单元,用于进行客户端与服务端之间的通信传输,任何需要传输的对象都需要被包装为一个Packet对象,同时WatchRegistration 也被包装到Packet对象中,然后放入发送队列中等待发送。

随后Zookeeper的客户端发送这个请求,并等待请求的响应,完成请求后返回,会由SendThread的readResponse进行响应。finishPacket方法会从Packet中取出对应的Watcher并注册到ZKWatcherManager中。

private void finishPacket(Packet p) { if (p.watchRegistration != null) { //注册到ZKWatcherManager中 p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; //事件线程进行处理 eventThread.queuePacket(p); } } //WatchRegistration.register方法如下 /** * Register the watcher with the set of watches on path. * @param rc the result code of the operation that attempted to * add the watch on the path. */ public void register(int rc) { if (shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = getWatches(rc); synchronized(watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); } } }

 注册流程图如下:

注:客户端注册的Watcher是不会发送到服务端的。可以从Packet的createBB方法中看到序列化时没有处理Watcher,代码如下:

public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { //序列化header requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); boa.writeBool(readOnly, "readOnly"); } else if (request != null) { //序列化request request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } }

可以看到只会处理header和request(request中有是否注册Watcher的标志),对于Watcher,没有序列化,当然也不会传输到服务端。

但是Watcher不传到服务端,服务端是怎么知道有Watcher存在的?又是怎么实现通知的?

Zookeeper服务端处理Watcher的序列图

FinalRequestProcessor.processRequest()中会判断当前请求是否需要注册Watcher,这个是由客户端传递过来时确定的

case OpCode.getData: { /* *此处省略掉部分代码 */ //getDataRequest.getWatch() 为true表示需要注册Watcher byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; }

如果需要注册Watcher,于是就会将当前的ServerCnxn进行注册,我们看下ServerCnxn的定义

/** * Interface to a Server connection - represents a connection from a client * to the server. * 一个Server连接,代表一个客户端与服务端的连接 */ public abstract class ServerCnxn implements Stats, Watcher { //省略部分代码 public abstract void process(WatchedEvent event); //省略部分代码 }

服务端的Watcher统一由WatchManager进行管理

/** * This class manages watches. It allows watches to be associated with a string * and removes watchers and their watches in addition to managing triggers. */ public class WatchManager { private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class); //从数据节点的角度来托管Watcher,key表示path,Value表示该节点注册的Watcher private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); //从Watcher的粒度来控制事件触发时需要触发的数据节点 private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>(); public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); } //省略部分代码 }

Watcher的触发

上面了解了Watcher的注册,会将ServerCnxn进行注册到WatcherManager中,前面了解过事件的类型(EventType)可以是NodeDataChanged,NodeDataChanged的触发条件是“Watcher监听的数据节点的内容发生变更”。setData的具体实现如下所示

public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { //节点信息 Stat s = new Stat(); //先获取 DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; //设置节点数据 n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } //触发Watcher dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }

设置数据后调用WatchManager的triggerWatch方法来触发Watcher

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { //1、事件封装 WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { //2、查询Watcher并移除Watcher,所以可以看出Watcher是一次性的,使用后会失效 watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } //3、调用Watcher接口的process方法处理 w.process(e); } return watchers; }

可以看出Watcher是一次性的,使用后会失效。

前面说过注册时其实注册的就是ServerCnxn,ServerCnxn的实现有NIOServerCnxn 和 NettyServerCnxn....。实现逻辑都差不多,此处以NIOServerCnxn进行说明。

//此方法经过处理,去除掉日志等 public void process(WatchedEvent event) { //参数依次是 xid zxid err ,将ReplyHeader中的参数设置为-1,标识这是一个通知 ReplyHeader h = new ReplyHeader(-1, -1L, 0); //WatchedEvent包装为WatcherEvent,方便进行序列化传输 WatcherEvent e = event.getWrapper(); try { //发送响应回客户端 sendResponse(h, e, "notification"); } catch (IOException e1) { close(); } }

客户端回掉Watcher

由SendThread的readResponse方法进行接收。

void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); //反序列化获得头部 replyHdr.deserialize(bbia, "header"); //***省略了部分代码***// // -1 标识这是一个通知 if (replyHdr.getXid() == -1) { WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } //还原得到WatchedEvent WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //加入Eventqueue进行后续的回掉处理 eventThread.queueEvent( we ); return; } //*******省略部分代码*******// }

EventThread的queueEvent方法如下

public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } //获得session的状态。 sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }

可以看出,最核心的就是ZKWatcherManager的materialize方法

接口定义如下:

/** */ public interface ClientWatchManager { /** * Return a set of watchers that should be notified of the event. The * manager must not notify the watcher(s), however it will update it's * internal structure as if the watches had triggered. The intent being * that the callee is now responsible for notifying the watchers of the * event, possibly at some later time. * * @param state event state * @param type event type * @param path event path * @return may be empty set but must not be null */ public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path); }

ZKWatcherManager中materialize实现如下:

@Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { //***省略部分代码**// case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; //***省略部分代码***// return result; } }

客户端在识别出事件类型EventType后,会从相应的Watcher存储(即dataWatchers、existsWatchers或childWatchers中的一个或多个)中去除相应的Watcher。使用remove接口,表示客服端的Watcher也是一次性的。解析完Watcher之后就会放入到EventThread的waitingEvents中,等待线程消耗处理。

EventThread的run方法如下:去除了日志等

@Override public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { //处理事件 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { } }

processEvent方法如下

private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { //调用process进行处理 watcher.process(pair.event); } catch (Throwable t) { } } } else{ //去除 } }

从代码可知,EventThread线程每次从waitingEvents队列中取出一个Watcher,并进行串行同步处理。此Watcher才是客户端真正注册的Watcher。

Watcher特性总结

一次性:不管是客户端的Watcher还是服务端的Watcher,都只能使用一次,使用一次后就失效

客户端串行化:客户端Watcher回调的过程是一个串行同步的过程

轻量:WatchedEvent是Zookeeper整个Watcher通知机制的最小通知单元,这个数据只包含三个部分:通知状态,事件类型和节点路劲。并不会有事件的详细类型。这样设计客户端注册时也不需要把整个Watcher传递到服务端。

此文仅供个人学习记录使用。

最新回复(0)