Zookeeper源码分析-watch机制

Zookeeper框架设计了一种叫做watch的机制,客户端可以给某个节点添加watcher,当节点改变的时候,服务端会推送一个消息到客户端。

在3.6.0之前的版本,node上的watcher都是一次性的,即当收到服务端的通知后,如果还想监听需要再次设置,在3.6.0之后,可以设置一个永久且递归的watcher。

这个技术是很有用的,比如在dubbo中,consumer端可以只关注自己感兴趣的provider,而不用关心全部的provider,本文就来梳理一下这个watch机制。

目前普通的方法,watch机制仅允许在getData、exist、getChildren这三个方法调用的时候进行添加。或者直接调用addWatch方法给指定节点添加watcher。

当我们使用的时候,可以自定义watcher的逻辑,也可以使用创建zookeeper时指定的watcher逻辑。

//使用默认的watch
zookeeper.getData(nodePath, true, null);
zookeeper.addWatch(nodePath,AddWatchMode.PERSISTENT);
//使用自定义逻辑
zookeeper.getData(nodePath, watchedEvent->{
            if(watchedEvent.getType()==Watcher.Event.EventType.NodeCreated){
                System.out.println("新增节点"+watchedEvent.getPath());
            }
            if(watchedEvent.getType()==Watcher.Event.EventType.NodeDeleted){
                System.out.println("删除节点"+watchedEvent.getPath());
            }
        },null);
zookeeper.addWatch(nodePath,watchedEvent->{
            //业务逻辑
        },AddWatchMode.PERSISTENT);

客户端

zookeeper的客户端本质上也是一个socket客户端,主要分为两个线程,一个是sendThread,主要用于发送和接受消息。另一个则是eventThread,就是用来处理诸如watcher的回调信息的。

当我们通过getData方法添加了一个watcher时,底层做了两件事情:

一是在request中标识watch为true,告诉服务端我对于这个路径注册了watcher

二是当请求执行完的时候,将watcher添加到watchManager中的集合中去。

当收到服务端回调的时候,本质上是一个code为NOTIFICATION_XID的消息,在sendThread接收到消息后,会通过调用EventThread的queueEvent方法通过队列解耦,让eventThread去异步处理事件

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // 会根据路径找到本地存储的对应的watcher
                watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<>(materializedWatchers);
            }
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // 添加到队列去异步处理
            waitingEvents.add(pair);
        }

服务端

服务端核心的类是WatchManager。在DataTree类中实例化了两个对象,分别是dataWatchers(对应getData)和childWatches(对应getChildren)

而服务端又将watcher分为三种类型:普通、持久、持久且递归,后两种只能通过zookeeper.addWatch来指定

public enum WatcherMode {
    STANDARD(false, false),
    PERSISTENT(true, false),
    PERSISTENT_RECURSIVE(true, true),
    ;

当接收到getData、exist、getChildren、setWatcher的请求时,会调用WatchManager的两个addWatch重载方法中的一个,将其添加到watchTable的map中,key为nodePath,value是watcher的集合。

这里需要强调一下!map中的value虽然类型是Watcher,但是其本质上确实一个socket连接,即客户端和服务端的连接。

以getData为例追溯一下这个watcher的赋值

这里又对应上客户端组装request时,给watch赋值的情况

当发生了对应的事件,比如createNode、deleteNode、setData等变更事件时,会调用WatchManager.triggerWatch方法来触达

这里先了解一下WatchStats是什么,其实就是各个模式的组合,然后看源码

public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
        Set<Watcher> watchers = new HashSet<>();
        synchronized (this) {
            PathParentIterator pathParentIterator = getPathParentIterator(path);
            for (String localPath : pathParentIterator.asIterable()) {
                //取出path对应的watcher
                Set<Watcher> thisWatchers = watchTable.get(localPath);
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    //取出watch对应的监听模式
                    Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
                    WatchStats stats = paths.get(localPath);
                    if (stats == null) {
                        LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
                        continue;
                    }
                    if (!pathParentIterator.atParentPath()) {
                        //添加到待处理的集合
                        watchers.add(watcher);
                        //移除标准模式的watcher,这也是为什么普通的watcher是一次性的原因
                        WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
                        //如果完全没有任何模式的监听,则移除该watcher
                        if (newStats == WatchStats.NONE) {
                            iterator.remove();
                            paths.remove(localPath);
                        } else if (newStats != stats) {
                            paths.put(localPath, newStats);
                        }
                        //处理持久且递归的情况
                    } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
                        watchers.add(watcher);
                    }
                }
                if (thisWatchers.isEmpty()) {
                    watchTable.remove(localPath);
                }
            }
        }
        ...
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            //通知客户端
            w.process(e);
        }
//最终其实就是发送了一个code为NOTIFICATION_XID的消息到客户端
public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
        WatcherEvent e = event.getWrapper();
        int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
        ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
    }

至此,和客户端完成闭环。