Zookeeper源码学习-单机部分通信组件

调试前准备

对于大部分的开源项目,一般都可以从其启动shell脚本中分析出来java启动类。我这里clone源码后以tag 3.9.1为主创建了一个分支,方便注释,调试。

在bin/zkServer.sh脚本中,其启动逻辑如下,如果是执行的./zkServer.sh start的话,最终执行的命令大概是

nohup java [一堆参数] org.apache.zookeeper.server.quorum.QuorumPeerMain [输出日志],ZOOMAIN变量可以往上溯源,四个出现的地方最终都是QuorumPeerMain类。因此QuorumPeerMain类就是zookeeper的启动类。

case $1 in
start)
    echo  -n "Starting zookeeper ... "
    if [ -f "$ZOOPIDFILE" ]; then
      if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
         echo $command already running as process `cat "$ZOOPIDFILE"`.
         exit 1
      fi
    fi
    nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
    "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
    -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
#  $ZOOMAIN=  org.apache.zookeeper.server.quorum.QuorumPeerMain
    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

知道启动类之后,就要进入debug模式进行调试,这里有两个点需要注意:

  1. 设置启动参数

    由于shell脚本通过命令行模式执行的时候拼接了非常多的参数,虽然大部分参数都可以忽略,但是config参数确是必须的,即ZOOMAIN 后的 “ZOOCFG”。因此在通过idea等编辑器进行debug的时候,要设置program arguments为zk的配置文件(这里的zoo.cfg是复制的zoo_sample.cfg)

  2. 启动时报类找不到,编译失败

    这是由于在pom文件中,有部分依赖的scope是provider类型,即zk项目本身不提供,由依赖方提供。我这里为了省事,将zookeeper-server的pom.xml里所有<scope>provided</scope>都够注释掉了

  3. 当涉及到客户端和服务端通信时,建议session过期时间设置的长一些,不然在debug的时候总是会session过期,连接关闭

    但是由于有最大限制,因此建议临时注释掉最大限制的逻辑

源码分析

QuorumPeerMain

QuorumPeerMain类的逻辑相对简单,需要注意的是集群和单机两种模式走的是不同的逻辑。下面我会先分析单机模式熟悉结构后,再去看集群模式。

public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
}
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
        //args[0]一般是zk的配置文件zoo.cfg,即这里将配置文件转换为javaBean
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        //通过定时执行PurgeTask来清楚data数据和datalog数据
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
            config.getDataDir(),
            config.getDataLogDir(),
            config.getSnapRetainCount(),
            config.getPurgeInterval());
        purgeMgr.start();
//        集群和单机两种方式
        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
            // there is only server in the quorum -- run as standalone
            //单机模式
            ZooKeeperServerMain.main(args);
        }
    }

ZooKeeperServerMain

单机版的组件主要如上图所示。

AdminServer: zk通过jetty简单实现了一个后端admin,可通过http://localhost:8080/commands去进行一些操作。

MetricsProvider:用于记录、监控一些指标,本文不做分析

JvmPauseMonitor: 来源于hadoop的一个监控,有个线程一直死循环,当判断系统暂停时间大于一定指标,打印一条信息

JMX: java所提供的jmx

QuorumPeerConfig: 读取的是zoo.cfg,服务于zk集群模式

ServerConfig: 读取的也是zoo.cfg,但是仅限于zk单机模式,且主要服务于通信组件

启动入口

final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig);

boolean needStartZKServer = true;
  if (config.getClientPortAddress() != null) {
    cnxnFactory = ServerCnxnFactory.createFactory();//默认是NIOServerCnxnFactory
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
    //启动入口                
    cnxnFactory.startup(zkServer);
    // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
    needStartZKServer = false;
  }

public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
        //启动socket相关的线程
        start();
        setZooKeeperServer(zks);
        if (startServer) {
            //启动zkDataBase
            zks.startdata();
            //启动其他组件(限流、processor、session、jmx等)
            zks.startup();
        }
}

通信流程

ServerCnxnFactory

public void start() {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
        // ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }

ServerCnxnFactory主要负责socket相关的部分,从start方法可以看出,zk的serverSocket设计为了四种线程,acceptThread来监听accept事件,selectorThread监听读写io,workerPool负责处理IO,expirerThread负责处理过期的连接。

AcceptThread

AcceptThread只处理accept事件,且通过队列进行解耦。

private boolean doAccept() {
            boolean accepted = false;
            SocketChannel sc = null;
            try {
                //获取到socketChannel
                sc = acceptSocket.accept();
                accepted = true;
                //中间会校验一下是否达到最大连接

                sc.configureBlocking(false);
                //建立连接后分配给selector线程,轮询分配
                // Round-robin assign this connection to a selector thread
                if (!selectorIterator.hasNext()) {
                    selectorIterator = selectorThreads.iterator();
                }
                SelectorThread selectorThread = selectorIterator.next();
                //将新accept的socket传递给selector
                if (!selectorThread.addAcceptedConnection(sc)) {
                    throw new IOException("Unable to add connection to selector queue"
                                          + (stopped ? " (shutdown in progress)" : ""));
                }
                acceptErrorLogger.flush();
            }
            return accepted;
        }

    }

public boolean addAcceptedConnection(SocketChannel accepted) {
            //将socket放入acceptedQueue,在selectorThread进行处理
            if (stopped || !acceptedQueue.offer(accepted)) {
                return false;
            }
            //唤醒阻塞在selectorThread上的select方法
            wakeupSelector();
            return true;
        }
SelectorThread
public void run() {
            try {
                while (!stopped) {
                    try {
                        //处理读写IO事件
                        select();
                        //新accept的socket注册read事件
                        processAcceptedConnections();
                        //暂时没理解透下面这个方法
                        processInterestOpsUpdateRequests();
                        }
}}}
private void select() {
            try {
                //当收到accept事件或者由wakeupSelector()唤醒selector,下面的循环可能有事件,也可能没有事件,有事件优先处理,没有的话跳出select方法,去给新accept的socket注册read事件
                selector.select();

                Set<SelectionKey> selected = selector.selectedKeys();
                ArrayList<SelectionKey> selectedList = new ArrayList<>(selected);
                Collections.shuffle(selectedList);
                Iterator<SelectionKey> selectedKeys = selectedList.iterator();
                while (!stopped && selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selected.remove(key);

                    if (!key.isValid()) {
                        cleanupSelectionKey(key);
                        continue;
                    }
                    if (key.isReadable() || key.isWritable()) {
                        //处理IO,其实是将key包装为request扔到workerpool
                        handleIO(key);
                    } else {
                        LOG.warn("Unexpected ops in select {}", key.readyOps());
                    }
                }
            } 
private void processAcceptedConnections() {
            SocketChannel accepted;
            while (!stopped && (accepted = acceptedQueue.poll()) != null) {
                SelectionKey key = null;
                try {
                    //当前socket注册read事件
                    key = accepted.register(selector, SelectionKey.OP_READ);
                    //根据socketChannel创建连接
                    NIOServerCnxn cnxn = createConnection(accepted, key, this);
                    //这里key持有了cnxn,这样在处理io的时候才能获取到对应对象
                    key.attach(cnxn);
                    addCnxn(cnxn);
                } catch (IOException e) {
                    // register, createConnection
                    cleanupSelectionKey(key);
                    fastCloseSock(accepted);
                }
            }
        }
expirerThread
        public void run() {
            try {
                while (!stopped) {
                    long waitTime = cnxnExpiryQueue.getWaitTime();
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                        continue;
                    }
                    //关闭过期的链接
                    for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
                        ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
                        conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED);
                    }
                }

            } catch (InterruptedException e) {
                LOG.info("ConnnectionExpirerThread interrupted");
            }
        }

expirer线程的run逻辑是很简单的,就是从一个队列取出过期的连接,并进行关闭。程序会通过touchCnxn方法,来延长连接的过期时间。touchCnxn方法在创建连接,处理IO事件的前后都会调用。

由于连接和session的过期机制都是一样的,所以这里先不分析cnxnExpiryQueue.poll和cnxnExpiryQueue.update方法,在下面讲到sessionTrack的时候进行分析

    public void touchCnxn(NIOServerCnxn cnxn) {
        cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
    }
workPool

worker线程的核心任务,是将socket中的数据读取并封装为request,然后交由processor去处理。

void doIO(SelectionKey k) throws InterruptedException {
        try {
            //前4个字节用来记录长度
            if (k.isReadable()) {
                //将消息长度写到incomingBuffer,返回值小于0表示没有读到消息长度
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                        handleFailedRead();
                }
                //正常来讲,消息长度占4个字节,所以此时limit==position
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    //这里没有特别理解,incomingBuffer什么情况下才会不等于呢??
                    if (incomingBuffer == lenBuffer) { // start of next request
                        //变成读模式,会在readLength读到incomingBuffer写入的数据(即长度)
                        incomingBuffer.flip();
                        //incomingBuffer在这里会重新分配
                        isPayload = readLength(k);
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // not the case for 4letterword
                        //读取具体的数据
                        readPayload();
                    } else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }
private boolean readLength(SelectionKey k) throws IOException {
        // Read the length, now get the buffer
        //获取到数据长度
        int len = lenBuffer.getInt();
        zkServer.checkRequestSizeWhenReceivingMessage(len);
        //重新分配长度
        incomingBuffer = ByteBuffer.allocate(len);
        return true;
    }
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
        //刚分配了新的空间,所以这里一定不等于0,因此可以将socket剩余信息写到incomingBuffer
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                handleFailedRead();
            }
        }
        //incomingBuffer已经写完数据了,所以下面就是flip后再读数据了
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            incomingBuffer.flip();
            packetReceived(4 + incomingBuffer.remaining());
            if (!initialized) {
                //只在这里面会将initialized设置为true,所以第一次建立连接一定会走进来
                readConnectRequest();
            } else {
                readRequest();
            }
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

当建立连接后第一次处理io会走到readConnectRequest方法,这个方法封装的是ConnectRequest,由processConnectRequest进行处理

BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zkServer.processConnectRequest(this, request);

其他时候会走到readRequest,封装的是普通的RequestRecord,由processPacket进行处理

        RequestHeader h = new RequestHeader();
        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
        RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
        zkServer.processPacket(this, h, request);

processConnectRequest最核心的工作就是创建了session,并封装了个createSession的request进行处理

if (sessionId == 0) {
            long id = createSession(cnxn, passwd, sessionTimeout);


    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            // Possible since it's just deserialized from a packet on the wire.
            passwd = new byte[0];
        }
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        CreateSessionTxn txn = new CreateSessionTxn(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
        submitRequest(si);//所以首次创建session最终还是会走到submitRequest
        return sessionId;
    }

processPacket的工作则是进行认证校验和request转发

public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
        //除了校验之外则是正常的请求转发
        if (h.getType() == OpCode.auth) {
            ...
        } else if (h.getType() == OpCode.sasl) {
            ...
        } else {
            if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
                // Authentication enforcement is failed
                // Already sent response to user about failure and closed the session, lets return
                return;
            } else {
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
                int length = request.limit();
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejected
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
                si.setOwner(ServerCnxn.me);
                //正常的请求则会从这里进去
                submitRequest(si);
            }
        }
    }

至此,我们发现都会走到submitRequest方法,而这个方法的逻辑则是将request扔到节流器

    public void submitRequest(Request si) {
        if (restoreLatch != null) {
            try {
                LOG.info("Blocking request submission while restore is in progress");
                restoreLatch.await();
            } catch (final InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
        }
        enqueueRequest(si);//会将request扔到throttler的submittedRequests
    }
        public void enqueueRequest(Request si) {
        if (requestThrottler == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (requestThrottler == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        requestThrottler.submitRequest(si);
    }

RequestThrottler

throttler的创建和启动是在startupWithServerState方法中

    private void startupWithServerState(State state) {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        //用于管理每个链接的session
        startSessionTracker();
        //以单链表的方式串起来processor
        setupRequestProcessors();
        //节流器,所有的request都会先过一遍这个
        startRequestThrottler();
    }
    protected void startRequestThrottler() {
        requestThrottler = createRequestThrottler();
        requestThrottler.start();
    }

上面讲到所有的请求都执行了requestThrottler.submitRequest,这个操作实际是将请求放入了submittedRequests队列中,requestThrottler作为一个线程,循环取出队列中的请求并判断是否做限流。

下面的代码主要有两个逻辑:

一是当正在执行的请求数量达到上限时,要一直阻塞到数量小于maxRequests

二是假如请求在队列里等待的时间大于throttled_op_wait_time,标记为已限流,后面的Processor处理时会抛异常。

public void run() {
        try {
            while (true) {
                if (killed) {
                    break;
                }
                //从队列取出一个请求
                Request request = submittedRequests.take();
                if (Request.requestOfDeath == request) {
                    break;
                }

                if (request.mustDrop()) {
                    continue;
                }

                // Throttling is disabled when maxRequests = 0
                if (maxRequests > 0) {
                    while (!killed) {
                        //一直都没有处理,导致连接都关闭了,或者session超时了,丢掉请求
                        if (dropStaleRequests && request.isStale()) {
                            // Note: this will close the connection
                            dropRequest(request);
                            ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
                            request = null;
                            break;
                        }
                        //没达到上限时跳出循环
                        if (zks.getInProcess() < maxRequests) {
                            break;
                        }
                        //达到上限了的话,等待stallTime这么在重新进循环,避免过多执行循环
                        throttleSleep(stallTime);
                    }
                }

                if (killed) {
                    break;
                }

                // A dropped stale request will be null
                if (request != null) {
                    if (request.isStale()) {
                        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                    }
                    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
                    ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
                    //当在队列等待时间过长,大于限流等待时间时,会将请求标志位已限流;已限流的请求在finalRequestProcessor中会抛异常Code.THROTTLEDOP
                    if (shouldThrottleOp(request, elapsedTime)) {
                      request.setIsThrottled(true);
                      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
                    }
                    //交由Processor去处理
                    zks.submitRequestNow(request);
                }
            }
        }

假如没有被限流的话,执行到zks.submitRequestNow(request);后就将request交给processor处理了

RequestProcessor

zookeeper中将Processor分为了三个Processor,其启动是在setupRequestProcessors方法中,以链表的形式串起了三个processor,以PrepRequestProcessor为head,FinalRequestProcessor为tail。

protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor) firstProcessor).start();
}
public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
PrepRequestProcessor

PrepRequestProcessor的主要工作就是校验和创建事务。

当调用processRequest时,也是通过队列解耦了一下,通过线程去异步执行,核心方法就是pRequestHelper

public void processRequest(Request request) {
        request.prepQueueStartTime = Time.currentElapsedTime();
        submittedRequests.add(request);
        ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
public void run() {
        LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
        try {
            while (true) {
                Request request = submittedRequests.take();
                ...不重要的逻辑
                pRequest(request);
            }
        }
protected void pRequest(Request request) throws RequestProcessorException {
        request.setHdr(null);
        request.setTxn(null);
        //正常情况下request的isThrottled应该是false,所以会执行pRequestHelper
        if (!request.isThrottled()) {
            //一些前置的校验,事务的创建等
          pRequestHelper(request);
        }
        //获取事务ID,
        request.zxid = zks.getZxid();
        long timeFinishedPrepare = Time.currentElapsedTime();
        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
        //由syncRequestProcessor执行
        nextProcessor.processRequest(request);
        ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
    }

pRequestHelper方法里根据不同的OpCode有各自的处理方法,但是总共依旧可以区分为两大类:需要创建事务和不需要创建事务

不需要创建事务的如下,就只执行checkSession方法,就是校验一下session是否正常

而需要创建事务的逻辑则也差不多,大多数都是封装为一个Request对象,然后执行pRequest2Txn方法

pRequest2Txn方法的主要逻辑则是校验request、校验Acl权限、校验路径、校验[quota](Zookeeper笔记之quota - CC11001100 - 博客园)、创建事务和创建事务摘要

//校验request
        validateCreateRequest(path, createMode, request, ttl);
//acl权限校验
        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
//校验路径名称合规
        validatePath(path, request.sessionId);
//校验节点数和字节数
        zks.checkQuota(path, null, data, OpCode.create);
//创建事务
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
//事务摘要
setTxnDigest(request, nodeRecord.precalculatedDigest);
SyncRequestProcessor

SyncRequestProcessor主要有两个操作,一个是写事务,一个是生成快照。

对于非事务操作,这个processor是没有意义的;只有事务操作,才会进行日志的持久化。

public void run() {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());

                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                //之所有先poll一次是为了当queuedRequests队列为null的时候,期望可以触发一下flush
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                    flush();
                    si = queuedRequests.take();
                }

                if (si == REQUEST_OF_DEATH) {
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                // 普通的查询都没有事务,所以都会返回false;append这里只是写到了流里,还需要在commit方法中调用flush才会落盘
                if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // roll the log
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read or a throttled request(which doesn't need to be written to the disk),
                    // and there are no pending flushes (writes), then just pass this to the next processor
                    //当没有需要flush的内容,直接调用下一个processor处理;但假如toFlush队列有内容,就只能排队处理了
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                //排队处理
                toFlush.add(si);
                if (shouldFlush()) {
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

关于写事务的方法是append和flush

public synchronized boolean append(Request request) throws IOException {
        TxnHeader hdr = request.getHdr();
        //普通的查询都没有事务,所以都会返回false
        if (hdr == null) {
            return false;
        }
        ...
        //当logStream == null时会新创建文件,首次创建或者文件达到上限新创建
        if (logStream == null) {
            LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));

            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
            long dataSize = oa.getDataSize();
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            //新建文件的文件头要立刻flush
            logStream.flush();
            filePosition += oa.getDataSize() - dataSize;
            filePadding.setCurrentSize(filePosition);
            //在commit的时候进行flush
            streamsToFlush.add(fos);
        }
        fileSize = filePadding.padFile(fos.getChannel(), filePosition);
        //request序列化
        byte[] buf = request.getSerializeData();
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        long dataSize = oa.getDataSize();
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        //将request写到流里
        Util.writeTxnBytes(oa, buf);
        unFlushedSize += oa.getDataSize() - dataSize;
        return true;
    }
private void flush() throws IOException, RequestProcessorException {
        if (this.toFlush.isEmpty()) {
            return;
        }

        ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());

        long flushStartTime = Time.currentElapsedTime();
        //将事务日志从流里刷到盘里
        zks.getZKDatabase().commit();
        ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);

        if (this.nextProcessor == null) {
            this.toFlush.clear();
        } else {
            while (!this.toFlush.isEmpty()) {
                final Request i = this.toFlush.remove();
                long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
                //到finalRequestProcessor
                this.nextProcessor.processRequest(i);
            }
            if (this.nextProcessor instanceof Flushable) {
                ((Flushable) this.nextProcessor).flush();
            }
        }
        lastFlushTime = Time.currentElapsedTime();
    }

public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
            filePosition += unFlushedSize;
            // If we have written more than we have previously preallocated,
            // we should override the fileSize by filePosition.
            if (filePosition > fileSize) {
                fileSize = filePosition;
            }
            unFlushedSize = 0;
        }
        for (FileOutputStream log : streamsToFlush) {
            //刷盘
            log.flush();
            ...
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.poll().close();
        }

        // 达到上限了创建新事务文件
        if (txnLogSizeLimit > 0) {
            long logSize = getCurrentLogSize();

            if (logSize > txnLogSizeLimit) {
                LOG.debug("Log size limit reached: {}", logSize);
                //创建新文件
                rollLog();
            }
        }
    }
FinalRequestProcessor

到目前为止,如果是查询请求,那还没有进行查询操作;如果是事务请求,只是将事务落盘了,内存中还没有进行修改。加上返回对象的组装,这些都是在这个Processor进行处理的。下面分别以创建节点和查询数据两个为例

  1. 创建节点(create)

    创建节点属于事务操作,所以在返回结果之间要同步修改内存中的值,核心方法是applyRequest,一直会执行到DataTree的processTxn

    public void processRequest(Request request) {
            LOG.debug("Processing request:: {}", request);
            ProcessTxnResult rc = null;
            if (!request.isThrottled()) {
              //处理事务或session
              rc = applyRequest(request);
            }
    

    将其执行结果封装为response,并发送出去

  2. 查询数据(getData)

    由于查询数据不是事务,所以在applyRequest中并没有啥操作,直接走到下面的switch,封装request,然后从zkDataBase获取节点,再获取数据封装为response并返回

SessionTracker

session是逻辑上客户端和服务端的一次长连接的通话,依托于物理的长连接connection。当第一次建立连接之后,zookeeper会为这个链接创建一个session并设置一个过期时间,只要没到过期时间,期间哪怕连接断开,只要可以重新连接,那依然算作一个session内。

zookeeper利用session实现了如临时节点,即当session过期,当前session所建立的临时节点都会被删除。

sessionTrack的创建也是在zookeeperServer.start的时候

而session的过期处理和connect的过期处理逻辑基本一样,在这里以session讲解一下逻辑,上面的connect过期部分也就看的懂了。

public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }
                //看似是poll,其实是取出expiryMap中已经过期的队列
                for (SessionImpl s : sessionExpiryQueue.poll()) {
                    ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);//其实就是close操作
                }
            }
        }
    }

过期逻辑核心是一个队列ExpiryQueue,核心方法是update和poll方法。

ExpiryQueue持有两个集合对象elemMap和expiryMap,

elemMap的key为连接对象,在这里就是指session,在connect逻辑中就是指connect,value是其过期时间。

expiryMap的key是过期时间,value是当前过期时间下对应的连接的集合。结构大概如下图。

此外,ExpiryQueue还记录了下一个要过期的时间nextExpirationTime,因此,对于poll方法,就是遍历expiryMap,找到nextExpirationTime<now<key的values,并关闭即可

public Set<E> poll() {
        long now = Time.currentElapsedTime();
        //当前最后一个要过期的时间
        long expirationTime = nextExpirationTime.get();
        //时间还没到,所以都不过期
        if (now < expirationTime) {
            return Collections.emptySet();
        }

        Set<E> set = null;
        //计算下一个周期并赋值
        long newExpirationTime = expirationTime + expirationInterval;
        if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
            //取到上一个过期的周期的集合
            set = expiryMap.remove(expirationTime);
        }
        if (set == null) {
            return Collections.emptySet();
        }
        return set;
    }

而update方法也好理解,其实就是给session续期,假如续期之后其落到下个周期了,则要从上个周期对应的集合移除,添加到下个周期的集合

public Long update(E elem, int timeout) {
        //取到session的之前过期时间
        Long prevExpiryTime = elemMap.get(elem);
        long now = Time.currentElapsedTime();
        //续期
        Long newExpiryTime = roundToNextInterval(now + timeout);
        //续期之后依然属于上个周期,则不需要改变expiryMap
        if (newExpiryTime.equals(prevExpiryTime)) {
            // No change, so nothing to update
            return null;
        }
        //续期之后不属于上个周期了,则要将当前sessoin加到下一个周期的集合,并从上一个集合删除
        // First add the elem to the new expiry time bucket in expiryMap.
        Set<E> set = expiryMap.get(newExpiryTime);
        if (set == null) {
            // Construct a ConcurrentHashSet using a ConcurrentHashMap
            set = Collections.newSetFromMap(new ConcurrentHashMap<>());
            // Put the new set in the map, but only if another thread
            // hasn't beaten us to it
            Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
            if (existingSet != null) {
                set = existingSet;
            }
        }
        set.add(elem);

        // Map the elem to the new expiry time. If a different previous
        // mapping was present, clean up the previous expiry bucket.
        prevExpiryTime = elemMap.put(elem, newExpiryTime);
        if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
            Set<E> prevSet = expiryMap.get(prevExpiryTime);
            if (prevSet != null) {
                prevSet.remove(elem);
            }
        }
        return newExpiryTime;
    }

而update方法的调用上层是touchSession,在Processor处理前会调用。

参考文档:

【图解源码】Zookeeper3.7源码分析,包含服务启动流程源码、网络通信源码、RequestProcessor处理请求源码 - 掘金

【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程 - 掘金

Zookeeper源码分析 | Coding Tree

Zookeeper笔记之quota - CC11001100 - 博客园