Zookeeper源码分析-Zab协议

基本概念

建议读一下20 共识算法:一次性说清楚 Paxos、Raft 等算法的区别,里面讲的还算易懂。

ZAB是zookeeper专门为了保持分布式共识而实现的算法,算法的基石还是Paxos的原理,或者说Mluti Paxos的原理。和Raft算法很像,都具有leader的概念,消息都是通过leader去统一管理,其他节点只需要从leader同步消息即可,并且都是超过半数提交事务就认为成功了。

如果对分布式算法完全陌生,建议了解一下Paxos,MlutiPaxos,Raft的基本概念。

在ZAB协议中,其设计主要包括两部分:原子广播和崩溃恢复。

广播的意思是指当节点收到消息后,要将消息广播给其他的节点。那什么叫做原子呢?原子的意思是在这次广播中,要么所有的节点都收到消息并进行广播;要么所有的节点都放弃该条消息不做广播。

但仅是广播并不能保证所有节点的数据完全一致,还需要保证消息必须有序的处理,每个节点必须处理完上一个消息,才能处理下一条消息。

这是原子广播的两个基本要求:原子和有序。

当leader挂掉的时候,新的leader必须尽快了解并统一整个集群当前的进度。因此,在zk选举流程 中,可以理解为什么必须选择最大事务ID作为leader,因为只有这样,才能在当前节点拥有最全的事务日志。当拥有了最全的事务后,从节点去进行同步到最新的进度。假如原来的leader又恢复了,那么就需要原来的leader丢弃超出现有leader的事务。这就是ZAB的另一部分–崩溃恢复。

整体设计

所有的节点分为了Leader、Follower、Observer三种,其中Leader和Follower是有投票权的。Observer是一个只处理只读请求的节点,属于作为提交吞吐量的横向扩展机制。

之前的选举机制中也提到过,当选举结束之后,每个节点会更新自己的状态,当进入下一次循环的时候,会走到各自的case下执行各自的逻辑。

虽然每个节点只有两行方法,但背后的逻辑还是蛮复杂的,make***方法是构建Zookeeper,可以理解为原子广播部分的逻辑。

而lead、followLeader和observeLeader则是启动部分,主要是崩溃恢复的逻辑。下面我们以leader和follower串一下这个流程。

switch (state) {
    case OBSERVING:
        LOG.info("OBSERVING");
        setObserver(makeObserver(logFactory));
        observer.observeLeader();
        break;
    case FOLLOWING:
        LOG.info("FOLLOWING");
        setFollower(makeFollower(logFactory));
        follower.followLeader();
        break;
    case LEADING:
        LOG.info("LEADING");
        setLeader(makeLeader(logFactory));
        leader.lead();

崩溃恢复

执行到leader.lead()时,leader首先要保证集群尽快统一集群的进度。为了更好的理解这部分,需要了解ZabState类。

    public enum ZabState {
        ELECTION,//选举状态,处于选举时的状态,到调用leader.lead为止
        DISCOVERY,//发现状态,本质上是leader和其他节点建立连接,统一当前的周期epoch和事务ID,并得到大多数节点的认可
        SYNCHRONIZATION,//同步状态,follow节点根据leader确定的事务ID,多退少补
        BROADCAST//大多数节点的事务已经统一,可以接受来自客户端或者从节点的消息并广播了
    }

ELECTION

整个选举过程都是属于ELECTION状态,具体分析可以看选举机制

DISCOVERY

  1. 在这个过程中,leader和其他的节点需要建立通信;注意,这个和之前选举时建立的连接不同,选举时所建立的连接仅是为了选举工作。而这个连接,才是负责真正的通信。

  2. 建立连接之后,每个节点(不论是leader还是follower)要根据自己的事务id计算新的周期,zxid本身是64位的数值,低32位用来自增,高32位用来表示周期,因此需要将自己的高32+1来作为新的周期。

  3. 当leader获取到最新的周期后,要通知所有节点,当大多数节点都成功应答之后,当前集群的周期就算定下来了。

PS:有点像是皇帝的年号,在当上皇帝后,要告知天下当前是洪武、嘉靖还是崇祯之类的。

建立连接是由LearnerCnxAcceptor类为每个地址都创建了一个LearnerCnxAcceptorHandler来接受连接,并用LearnerHandler来处理socket请求

LearnerHandler是非常重要的一个类,其维护者leader和普通节点的通信和数据同步,几个步骤的流转也都是Learner回应了ACK,LearnerHandler收到之后才能往下推进的。

SYNCHRONIZATION

当所有节点就epoch和zxId协商一致之后,Learner节点就可以开始根据Leader的事务情况进行同步了。

关于数据的同步分为三种类型:

DIFF:差距很小,在这种情况下,leader以request的形式发给Learner就可以了 SNAP:差距过大,需要将日志文件直接发给learner
TRUNC: Learner的事务id比leader的还要大,因此Learner要截断事务

当超过半数节点表示数据已经跟上最新进度之后,leader就会启动服务器,并表示ack的这些节点可以接受客户端工作了,自此进入BROADCAST状态。

BROADCAST

在这个状态下,更核心的其实应该是那些不同类型节点的zkServer处理逻辑,或者说那些RequestProcessor的处理逻辑。而处于Leader、LearnerHandler、Follower者三个类中的逻辑,就只有维护好通信,做好心跳之类的框架工作。

几个阻塞态

在Leader.lead和LearnerHandler中,在zabState流转过程中会调用几个阻塞的方法来等待大部分节点的ACK

getEpochToPropose 获取不同节点的周期提案

waitForEpochAck Leader计算完最大的周期,通知其他节点,等待节点对于周期提案的答复

waitForNewLeaderAck 在已经确定周期的情况下,等待其他节点同步完数据之后,给Leader答复

这三个方法的逻辑是大似相同的,都是由Leader或者LearnerHandler来进行调用,下面以getEpochToPropose为例讲一下逻辑

当Leader调用时,传进来的lastAcceptedEpoch是Leader记录的最后的周期或者半数Learner已经商量好的周期。

当LearnerHandler调用时,是在Leader收到FollowInfo之后,这个时候已经获取到各个Follower的周期信息,因此每个LearnerHandler负责将自己Follower的周期传进来,以计算最大的周期,每个节点投票后,通过connectingFollowers.wait进行阻塞

当半数之上的节点参与投票之后,就定下来当前周期,唤醒所有wait的线程,并将waitingForNewEpoch设置为false,那些还没参与投票的就不用投票了

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
        synchronized (connectingFollowers) {
            if (!waitingForNewEpoch) {
                return epoch;
            }
            //每个节点都会调用getEpochToPropose反法,leader是直接调用,其他节点是在learnerHandler中调用
            //因此,假如从节点的周期大于主节点,主节点也要以大的为主
            if (lastAcceptedEpoch >= epoch) {
                epoch = lastAcceptedEpoch + 1;
            }
            //判断是否有投票权
            if (isParticipant(sid)) {
                connectingFollowers.add(sid);
            }
            QuorumVerifier verifier = self.getQuorumVerifier();
            //超过半数就通知所有线程继续执行,否则阻塞在这里
            //这里需要注意一下,仅是校验是否有半数参与投票了,而不是校验投票的最大周期超过半数
            //比如存在7票,顺序分别为3,3,3,4,3,3,5,那么当投票到第二个4的时候,就会确定下来当前周期为4,那些大于4的节点在收到epoch判断自己的周期比提案的周期大之后会断开连接
            //当然了,实际情况不会这么极端,毕竟Leader本身就是最大事务Id才能当选,所以大批量的Follower大于Leader是不可能的,只有老领导才会有可能大于新领导
            if (connectingFollowers.contains(self.getMyId()) && verifier.containsQuorum(connectingFollowers)) {
                waitingForNewEpoch = false;
                self.setAcceptedEpoch(epoch);
                connectingFollowers.notifyAll();
            } else {
                long start = Time.currentElapsedTime();
                if (sid == self.getMyId()) {
                    timeStartWaitForEpoch = start;
                }
                long cur = start;
                long end = start + self.getInitLimit() * self.getTickTime();
                //在等待时间内一直阻塞
                while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) {
                    connectingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (waitingForNewEpoch) {
                    throw new InterruptedException("Timeout while waiting for epoch from quorum");
                }
            }
            return epoch;
        }
    }

启动流程

参考上图走一下代码

Leader.lead()

void lead() throws IOException, InterruptedException {
    //zab状态是服务发现
    self.setZabState(QuorumPeer.ZabState.DISCOVERY);
    self.tick.set(0);
    zk.loadData();

    leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
    //用来和从节点同步数据的连接
    cnxAcceptor = new LearnerCnxAcceptor();
    cnxAcceptor.start();
    //等待所有节点计算出来最大的周期
    long epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());
    //该周期的事务id从0开始计算
    zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

    synchronized(this) {
        lastProposed = zk.getZxid();
    }
    //newLeaderProposal会在waitForEpochAck用到
    newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

    if ((newLeaderProposal.packet.getZxid() & 0xffffffff L) != 0) {
        LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
    }
    //Q&A 2024/8/18
    // Q: 由于一直没有理解getLastSeenQuorumVerifier是什么场景下的,所以这里也不是很理解
    // A:
    QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
    QuorumVerifier curQV = self.getQuorumVerifier();
    //区分第一次和其他情况,第一次的时候只有curQV,其他情况可能会有lastSeenQV
    if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
        try {
            LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
            QuorumVerifier newQV = self.configFromString(curQV.toString());
            newQV.setVersion(zk.getZxid());
            self.setLastSeenQuorumVerifier(newQV, true);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
    //上一个配置的版本大于当前的版本,就也要添加进去
    if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
        newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }
    // 大部分节点就周期和zxid协商一致
    waitForEpochAck(self.getMyId(), leaderStateSummary);
    self.setCurrentEpoch(epoch);
    self.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());
    //zab状态到达同步状态
    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);

    try {
        //数据已经同步完成
        waitForNewLeaderAck(self.getMyId(), zk.getZxid());
    } catch (InterruptedException e) {
        shutdown("Waiting for a quorum of followers, only synced with sids: [ " +
            newLeaderProposal.ackSetsToString() +
            " ]");
        HashSet < Long > followerSet = new HashSet < > ();

        for (LearnerHandler f: getLearners()) {
            if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
                followerSet.add(f.getSid());
            }
        }
        boolean initTicksShouldBeIncreased = true;
        for (Proposal.QuorumVerifierAcksetPair qvAckset: newLeaderProposal.qvAcksetPairs) {
            if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                initTicksShouldBeIncreased = false;
                break;
            }
        }
        if (initTicksShouldBeIncreased) {
            LOG.warn("Enough followers present. Perhaps the initTicks need to be increased.");
        }
        return;
    }
    //启动Leader端的Server,启动流程和单机版类似,只是会设置周期和最新事务ID
    startZkServer();
    String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
    if (initialZxid != null) {
        long zxid = Long.parseLong(initialZxid);
        zk.setZxid((zk.getZxid() & 0xffffffff00000000 L) | zxid);
    }

    if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
        self.setZooKeeperServer(zk);
    }
    //到达广播状态
    self.setZabState(QuorumPeer.ZabState.BROADCAST);
    self.adminServer.setZooKeeperServer(zk);
    boolean tickSkip = true;
    // If not null then shutdown this leader
    String shutdownMessage = null;
    //保持法定人数的统计和ping
    while (true) {
        synchronized(this) {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.tickTime / 2;
            while (cur < end) {
                wait(end - cur);
                cur = Time.currentElapsedTime();
            }

            if (!tickSkip) {
                self.tick.incrementAndGet();
            }
            SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
            syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
            if (self.getLastSeenQuorumVerifier() != null &&
                self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }

            syncedAckSet.addAck(self.getMyId());

            for (LearnerHandler f: getLearners()) {
                if (f.synced()) {
                    syncedAckSet.addAck(f.getSid());
                }
            }

            // check leader running status
            if (!this.isRunning()) {
                // set shutdown flag
                shutdownMessage = "Unexpected internal error";
                break;
            }


            //状态不健康时
            if (!tickSkip && !syncedAckSet.hasAllQuorums() &&
                !(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) && self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList < > (outstandingProposals.values()), lastCommitted))) {
                // Lost quorum of last committed and/or last proposed
                // config, set shutdown flag
                shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " +
                    syncedAckSet.ackSetsToString() +
                    " ]";
                break;
            }
            tickSkip = !tickSkip;
        }
        for (LearnerHandler f: getLearners()) {
            f.ping();
        }
    }
    if (shutdownMessage != null) {
        shutdown(shutdownMessage);
        // leader goes in looking state
    }
}

LearnHandler.run()

try {
    //ia是socket的输入,oa是socket的输出
    ia = BinaryInputArchive.getArchive(bufferedInput);
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    oa = BinaryOutputArchive.getArchive(bufferedOutput);
    //等于是从输入流中读取QuorumPacket序列化对象,这个对应的是follower的registerWithLeader方法
    QuorumPacket qp = new QuorumPacket();
    ia.readRecord(qp, "packet");
    //判断消息类型是不是FOLLOWERINFO和OBSERVERINFO,即上报节点信息
    messageTracker.trackReceived(qp.getType());
    if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
        LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());

        return;
    }

    ...

    //获取从节点的周期(高32位)
    long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

    long peerLastZxid;
    StateSummary ss = null;
    long zxid = qp.getZxid();
    long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
    long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
    //Q&A 2024/8/18
    // Q: 小于一万的意义?
    // A:
    if (this.getVersion() < 0x10000) {
        // we are going to have to extrapolate the epoch information
        long epoch = ZxidUtils.getEpochFromZxid(zxid);
        ss = new StateSummary(epoch, zxid);
        // fake the message
        learnerMaster.waitForEpochAck(this.getSid(), ss);
    } else {
        byte[] ver = new byte[4];
        ByteBuffer.wrap(ver).putInt(0x10000);
        QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
        oa.writeRecord(newEpochPacket, "packet");
        //将商议出来的周期和zxid发给客户端
        messageTracker.trackSent(Leader.LEADERINFO);
        bufferedOutput.flush();
        QuorumPacket ackEpochPacket = new QuorumPacket();
        ia.readRecord(ackEpochPacket, "packet");
        //客户端响应成功
        messageTracker.trackReceived(ackEpochPacket.getType());
        if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
            LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
            return;
        }
        ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
        ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
        learnerMaster.waitForEpochAck(this.getSid(), ss);
    }
    peerLastZxid = ss.getLastZxid();

    //计算节点和leader相差是否很多
    //三种类型:
    //DIFF:差距很小
    //SNAP:差距过大,需要将日志文件直接发给learner
    //TRUNC: Learner的事务id比leader的还要大,因此Learner要截断事务
    boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

    boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
    /* if we are not truncating or sending a diff just send a snapshot */
    if (needSnap) {
        syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
        syncThrottler.beginSync(exemptFromThrottle);
        ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
        try {
            //对于需要同步日志文件的,先发一个通知
            long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
            oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
            messageTracker.trackSent(Leader.SNAP);
            bufferedOutput.flush();

            // Dump data to peer
            //直接将日志dump发给follow
            learnerMaster.getZKDatabase().serializeSnapshot(oa);
            oa.writeString("BenWasHere", "signature");
            bufferedOutput.flush();
        } finally {
            ServerMetrics.getMetrics().SNAP_COUNT.add(1);
        }
    } else {
        syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
        syncThrottler.beginSync(exemptFromThrottle);
        ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
        ServerMetrics.getMetrics().DIFF_COUNT.add(1);
    }
    ...

    // Start thread that blast packets in the queue to learner
    //单独一个线程专门同步
    startSendingPackets();

    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");
    //收到Learner的ACK
    messageTracker.trackReceived(qp.getType());
    if (qp.getType() != Leader.ACK) {
        LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
        return;
    }

    LOG.debug("Received NEWLEADER-ACK message from {}", sid);
    //同步完了
    learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
    ...
    /*
     * Wait until learnerMaster starts up
     */
    learnerMaster.waitForStartup();
    //leader向follower发送UPTODATE,表明follower可以开始响应客户端了
    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    //接受follower消息,主要是两类吧,一类是request,一类是ack
    while (true) {
        自己看吧 没啥特殊的
    }
}

Follower.followLeader()

self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
try {
    //建立连接
    connectToLeader(leaderServer.addr, leaderServer.hostname);
    connectionTime = System.currentTimeMillis();
    //告诉leader自己的信息,并等待协商好的最新事务Id返回
    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
    if (self.isReconfigStateChange()) {
        throw new Exception("learned about role change");
    }
    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
    if (newEpoch < self.getAcceptedEpoch()) {
        LOG.error("Proposed leader epoch " +
            ZxidUtils.zxidToString(newEpochZxid) +
            " is less than our accepted epoch " +
            ZxidUtils.zxidToString(self.getAcceptedEpoch()));
        throw new IOException("Error: Epoch of leader is lower");
    }
    long startTime = Time.currentElapsedTime();
    self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
    //同步数据
    syncWithLeader(newEpochZxid);
    self.setZabState(QuorumPeer.ZabState.BROADCAST);
    completedSync = true;
    long syncTime = Time.currentElapsedTime() - startTime;
    ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
    if (self.getObserverMasterPort() > 0) {
        LOG.info("Starting ObserverMaster");

        om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
        om.start();
    } else {
        om = null;
    }
    // create a reusable packet to reduce gc impact
    QuorumPacket qp = new QuorumPacket();
    while (this.isRunning()) {
        readPacket(qp);
        processPacket(qp);
    }

消息处理流程

RequestProcessor

相比于单机版来说,集群的Processor显得有些多,但是像是PrepRequestProcessor、SyncRequestProcessor和FinnalProcessor都是复用,逻辑和zookeeper源码-单机版一样,这里就不赘述了。

LeaderRequestProcessor只是做了一个checkUpgradeSession,处理本地会话创建临时节点的情况。

PropsoalRequestProcessor 的工作就是将请求以提案的方法发给各个节点,并且本地落库

CommitProcessor 用于处理本地事务和总事务的匹配处理,这个类的代码是我觉得最难理解的

ToBeAppliedRequestProcessor 好像只是一个队列排序,没感知到实际作用

AckRequestProcessor 用于判断提案是否通过,通过的话给其他节点发送commit消息,并修改leader的LastZxid

FollowerRequestProcessor 的作用是 判断是否需要是只读请求,只读的话自己交给下一个Processor处理,如果是写事务的话则需要像Leader发request

SendAckRequestProcessor 是在收到leader的提案执行了SyncRequestProcessor(本地落库)之后调用的,用于回复Leader提案ACK

流程图

CommitProcessor

通信流程中其他部分的代码都比较好理解,只有CommitProcessor的run方法,我个人看了好几遍才懂,所以这里贴一下代码分析。

CommitProcessor持有四个队列:

queuedRequests 存储所有请求,无论读写,这个队列的作用主要是规定了顺序。

pendingRequests 表示正在等待处理的请求,是queuedRequests以sessionId分组后的结果,实际存储的请求可能比queuedRequests少,因为queuedRequests中有部分读请求是可以立即处理的。

queuedWriteRequests 只存储事务请求

committedRequests 表示已经提交的事务请求,在调用Commit之后才会添加进来。

这个类的执行逻辑是:

  1. 在processRequest中将所有请求添加到queuedRequests,事务请求添加到queuedRequests中

  2. 当queuedRequests或者committedRequests任意不为空的时候,执行run里的循环逻辑

  3. 如果是queuedRequests不为空,顺序出队;

    如果出队的是事务请求,添加到pendingRequests中

    如果出队的是只读请求,且正在处理的请求中不包含这个只读请求的SessionId,立即交由下一个Processor处理

    如果出队的是只读请求,但是这个session之前有写请求,那么这个只读请求也得添加到pendingRequests中,强制串行化

  4. 等待commitIsWaiting为真,即存在已经提交的事务请求不为空

    这里要理解一下,目前还为返回结果的请求只有两种,一种是未提交的事务请求,一种是阻塞在事务请求后的同session只读请求,因此想要处理这两种中的任意一个,都需要等待队首的事务提交

  5. 当commitIsWaiting为真之后,下面其实有两段逻辑:while循环是处理事务请求的,for循环是处理阻塞的只读请求的

  6. 从committedRequests取出已提交的请求,并从queuedWriteRequests中匹配,组装完整的request交给下一个Processor处理;并将当前请求的SessionId添加到queuesToDrain,表示里面阻塞的只读请求可以开始处理了

  7. 遍历可以处理的session,从pendingRequests中取出只读请求并进行处理,当遇到事务请求时停止

public void run() {
    int requestsToProcess = 0;
    boolean commitIsWaiting = false;
    do {
        //queuedRequests表示还没有commit的事务,主要用于排队
        //committedRequests表示已经commit的事务
        synchronized(this) {
            commitIsWaiting = !committedRequests.isEmpty();
            requestsToProcess = queuedRequests.size();
            //两个队列都没有表示现在没有请求需要处理
            if (requestsToProcess == 0 && !commitIsWaiting) {
                // Waiting for requests to process
                while (!stopped && requestsToProcess == 0 && !commitIsWaiting) {
                    wait();
                    commitIsWaiting = !committedRequests.isEmpty();
                    requestsToProcess = queuedRequests.size();
                }
            }
        }

        Request request;
        int readsProcessed = 0;
        while (!stopped &&
            requestsToProcess > 0 &&
            (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize) &&
            (request = queuedRequests.poll()) != null) {
            requestsToProcess--;
            //如果是写事务,或者虽然是读事务,但是该sessionId存在正在处理的请求都要放到队列里
            if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) {
                // Add request to pending
                Deque < Request > requests = pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque < > ());
                requests.addLast(request);
                ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
            } else {
                readsProcessed++;
                numReadQueuedRequests.decrementAndGet();
                //是读事务,直接交给Final处理
                sendToNextProcessor(request);
            }

            if (!commitIsWaiting) {
                commitIsWaiting = !committedRequests.isEmpty();
            }

            /*
             * 
             * 存在已提交的事务:意味着在pendingRequests和committedRequests都有request
             */
            if (commitIsWaiting && !stopped) {
                /*
                 * Drain outstanding reads
                 */
                //Q&A 2024/8/19
                // Q: 这里不理解,必须得是空了才能处理吗?
                // A:
                waitForEmptyPool();

                if (stopped) {
                    return;
                }

                int commitsToProcess = maxCommitBatchSize;

                Set < Long > queuesToDrain = new HashSet < > ();
                long startWriteTime = Time.currentElapsedTime();
                int commitsProcessed = 0;
                //处理事务请求
                while (commitIsWaiting && !stopped && commitsToProcess > 0) {

                    // Process committed head
                    request = committedRequests.peek();

                    if (!queuedWriteRequests.isEmpty() &&
                        queuedWriteRequests.peek().sessionId == request.sessionId &&
                        queuedWriteRequests.peek().cxid == request.cxid) {
                        Deque < Request > sessionQueue = pendingRequests.get(request.sessionId);
                        ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
                        //想象不出来什么时候会是这种情况
                        if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
                            break;
                        } else {
                            ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
                            // If session queue != null, then it is also not empty.
                            Request topPending = sessionQueue.poll();
                            topPending.setHdr(request.getHdr());
                            topPending.setTxn(request.getTxn());
                            topPending.setTxnDigest(request.getTxnDigest());
                            topPending.zxid = request.zxid;
                            topPending.commitRecvTime = request.commitRecvTime;
                            request = topPending;
                            if (request.isThrottled()) {
                                LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
                                ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
                            }
                            numWriteQueuedRequests.decrementAndGet();
                            //queuedWriteRequests出队
                            queuedWriteRequests.poll();
                            //由于处理了该session的写事务,因此可以处理该session的只读请求了
                            queuesToDrain.add(request.sessionId);
                        }
                    }
                    //出队
                    committedRequests.remove();
                    commitsToProcess--;
                    commitsProcessed++;

                    // Process the write inline.
                    //交由下一个Processor处理
                    processWrite(request);

                    commitIsWaiting = !committedRequests.isEmpty();
                }
                readsProcessed = 0;
                for (Long sessionId: queuesToDrain) {
                    Deque < Request > sessionQueue = pendingRequests.get(sessionId);
                    int readsAfterWrite = 0;
                    //取出对应session的待处理请求,只处理只读请求,当遇到事务请求就停止(得等待该事务被提交后处理)
                    while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) {
                        numReadQueuedRequests.decrementAndGet();
                        sendToNextProcessor(sessionQueue.poll());
                        readsAfterWrite++;
                    }
                    readsProcessed += readsAfterWrite;

                    // Remove empty queues
                    if (sessionQueue.isEmpty()) {
                        pendingRequests.remove(sessionId);
                    }
                }
            }

        }
        while (!stoppedMainLoop);
    }
}

参考文档:

Zab协议 (史上最全) - 疯狂创客圈 - 博客园

【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程 - 掘金

《Zookeeper》源码分析(十九)之 LearnerHandler-CSDN博客

20 共识算法:一次性说清楚 Paxos、Raft 等算法的区别