基本概念
建议读一下20 共识算法:一次性说清楚 Paxos、Raft 等算法的区别,里面讲的还算易懂。
ZAB是zookeeper专门为了保持分布式共识而实现的算法,算法的基石还是Paxos的原理,或者说Mluti Paxos的原理。和Raft算法很像,都具有leader的概念,消息都是通过leader去统一管理,其他节点只需要从leader同步消息即可,并且都是超过半数提交事务就认为成功了。
如果对分布式算法完全陌生,建议了解一下Paxos,MlutiPaxos,Raft的基本概念。
在ZAB协议中,其设计主要包括两部分:原子广播和崩溃恢复。
广播的意思是指当节点收到消息后,要将消息广播给其他的节点。那什么叫做原子呢?原子的意思是在这次广播中,要么所有的节点都收到消息并进行广播;要么所有的节点都放弃该条消息不做广播。
但仅是广播并不能保证所有节点的数据完全一致,还需要保证消息必须有序的处理,每个节点必须处理完上一个消息,才能处理下一条消息。
这是原子广播的两个基本要求:原子和有序。
当leader挂掉的时候,新的leader必须尽快了解并统一整个集群当前的进度。因此,在zk选举流程 中,可以理解为什么必须选择最大事务ID作为leader,因为只有这样,才能在当前节点拥有最全的事务日志。当拥有了最全的事务后,从节点去进行同步到最新的进度。假如原来的leader又恢复了,那么就需要原来的leader丢弃超出现有leader的事务。这就是ZAB的另一部分–崩溃恢复。
整体设计
所有的节点分为了Leader、Follower、Observer三种,其中Leader和Follower是有投票权的。Observer是一个只处理只读请求的节点,属于作为提交吞吐量的横向扩展机制。
之前的选举机制中也提到过,当选举结束之后,每个节点会更新自己的状态,当进入下一次循环的时候,会走到各自的case下执行各自的逻辑。
虽然每个节点只有两行方法,但背后的逻辑还是蛮复杂的,make***方法是构建Zookeeper,可以理解为原子广播部分的逻辑。
而lead、followLeader和observeLeader则是启动部分,主要是崩溃恢复的逻辑。下面我们以leader和follower串一下这个流程。
switch (state) {
case OBSERVING:
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
break;
case FOLLOWING:
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
break;
case LEADING:
LOG.info("LEADING");
setLeader(makeLeader(logFactory));
leader.lead();
崩溃恢复
执行到leader.lead()
时,leader首先要保证集群尽快统一集群的进度。为了更好的理解这部分,需要了解ZabState类。
public enum ZabState {
ELECTION,//选举状态,处于选举时的状态,到调用leader.lead为止
DISCOVERY,//发现状态,本质上是leader和其他节点建立连接,统一当前的周期epoch和事务ID,并得到大多数节点的认可
SYNCHRONIZATION,//同步状态,follow节点根据leader确定的事务ID,多退少补
BROADCAST//大多数节点的事务已经统一,可以接受来自客户端或者从节点的消息并广播了
}
ELECTION
整个选举过程都是属于ELECTION状态,具体分析可以看选举机制
DISCOVERY
在这个过程中,leader和其他的节点需要建立通信;注意,这个和之前选举时建立的连接不同,选举时所建立的连接仅是为了选举工作。而这个连接,才是负责真正的通信。
建立连接之后,每个节点(不论是leader还是follower)要根据自己的事务id计算新的周期,zxid本身是64位的数值,低32位用来自增,高32位用来表示周期,因此需要将自己的高32+1来作为新的周期。
当leader获取到最新的周期后,要通知所有节点,当大多数节点都成功应答之后,当前集群的周期就算定下来了。
PS:有点像是皇帝的年号,在当上皇帝后,要告知天下当前是洪武、嘉靖还是崇祯之类的。
建立连接是由LearnerCnxAcceptor类为每个地址都创建了一个LearnerCnxAcceptorHandler来接受连接,并用LearnerHandler来处理socket请求
LearnerHandler是非常重要的一个类,其维护者leader和普通节点的通信和数据同步,几个步骤的流转也都是Learner回应了ACK,LearnerHandler收到之后才能往下推进的。
SYNCHRONIZATION
当所有节点就epoch和zxId协商一致之后,Learner节点就可以开始根据Leader的事务情况进行同步了。
关于数据的同步分为三种类型:
DIFF:差距很小,在这种情况下,leader以request的形式发给Learner就可以了
SNAP:差距过大,需要将日志文件直接发给learner
TRUNC: Learner的事务id比leader的还要大,因此Learner要截断事务
当超过半数节点表示数据已经跟上最新进度之后,leader就会启动服务器,并表示ack的这些节点可以接受客户端工作了,自此进入BROADCAST状态。
BROADCAST
在这个状态下,更核心的其实应该是那些不同类型节点的zkServer处理逻辑,或者说那些RequestProcessor的处理逻辑。而处于Leader、LearnerHandler、Follower者三个类中的逻辑,就只有维护好通信,做好心跳之类的框架工作。
几个阻塞态
在Leader.lead和LearnerHandler中,在zabState流转过程中会调用几个阻塞的方法来等待大部分节点的ACK
getEpochToPropose 获取不同节点的周期提案
waitForEpochAck Leader计算完最大的周期,通知其他节点,等待节点对于周期提案的答复
waitForNewLeaderAck 在已经确定周期的情况下,等待其他节点同步完数据之后,给Leader答复
这三个方法的逻辑是大似相同的,都是由Leader或者LearnerHandler来进行调用,下面以getEpochToPropose为例讲一下逻辑
当Leader调用时,传进来的lastAcceptedEpoch是Leader记录的最后的周期或者半数Learner已经商量好的周期。
当LearnerHandler调用时,是在Leader收到FollowInfo之后,这个时候已经获取到各个Follower的周期信息,因此每个LearnerHandler负责将自己Follower的周期传进来,以计算最大的周期,每个节点投票后,通过connectingFollowers.wait进行阻塞
当半数之上的节点参与投票之后,就定下来当前周期,唤醒所有wait的线程,并将waitingForNewEpoch设置为false,那些还没参与投票的就不用投票了
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized (connectingFollowers) {
if (!waitingForNewEpoch) {
return epoch;
}
//每个节点都会调用getEpochToPropose反法,leader是直接调用,其他节点是在learnerHandler中调用
//因此,假如从节点的周期大于主节点,主节点也要以大的为主
if (lastAcceptedEpoch >= epoch) {
epoch = lastAcceptedEpoch + 1;
}
//判断是否有投票权
if (isParticipant(sid)) {
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
//超过半数就通知所有线程继续执行,否则阻塞在这里
//这里需要注意一下,仅是校验是否有半数参与投票了,而不是校验投票的最大周期超过半数
//比如存在7票,顺序分别为3,3,3,4,3,3,5,那么当投票到第二个4的时候,就会确定下来当前周期为4,那些大于4的节点在收到epoch判断自己的周期比提案的周期大之后会断开连接
//当然了,实际情况不会这么极端,毕竟Leader本身就是最大事务Id才能当选,所以大批量的Follower大于Leader是不可能的,只有老领导才会有可能大于新领导
if (connectingFollowers.contains(self.getMyId()) && verifier.containsQuorum(connectingFollowers)) {
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
if (sid == self.getMyId()) {
timeStartWaitForEpoch = start;
}
long cur = start;
long end = start + self.getInitLimit() * self.getTickTime();
//在等待时间内一直阻塞
while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) {
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (waitingForNewEpoch) {
throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}
启动流程
参考上图走一下代码
Leader.lead()
void lead() throws IOException, InterruptedException {
//zab状态是服务发现
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
//用来和从节点同步数据的连接
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
//等待所有节点计算出来最大的周期
long epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());
//该周期的事务id从0开始计算
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized(this) {
lastProposed = zk.getZxid();
}
//newLeaderProposal会在waitForEpochAck用到
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffff L) != 0) {
LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
}
//Q&A 2024/8/18
// Q: 由于一直没有理解getLastSeenQuorumVerifier是什么场景下的,所以这里也不是很理解
// A:
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
//区分第一次和其他情况,第一次的时候只有curQV,其他情况可能会有lastSeenQV
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
//上一个配置的版本大于当前的版本,就也要添加进去
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 大部分节点就周期和zxid协商一致
waitForEpochAck(self.getMyId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());
//zab状态到达同步状态
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
try {
//数据已经同步完成
waitForNewLeaderAck(self.getMyId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ " +
newLeaderProposal.ackSetsToString() +
" ]");
HashSet < Long > followerSet = new HashSet < > ();
for (LearnerHandler f: getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset: newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. Perhaps the initTicks need to be increased.");
}
return;
}
//启动Leader端的Server,启动流程和单机版类似,只是会设置周期和最新事务ID
startZkServer();
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000 L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
}
//到达广播状态
self.setZabState(QuorumPeer.ZabState.BROADCAST);
self.adminServer.setZooKeeperServer(zk);
boolean tickSkip = true;
// If not null then shutdown this leader
String shutdownMessage = null;
//保持法定人数的统计和ping
while (true) {
synchronized(this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null &&
self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getMyId());
for (LearnerHandler f: getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
// check leader running status
if (!this.isRunning()) {
// set shutdown flag
shutdownMessage = "Unexpected internal error";
break;
}
//状态不健康时
if (!tickSkip && !syncedAckSet.hasAllQuorums() &&
!(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) && self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList < > (outstandingProposals.values()), lastCommitted))) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " +
syncedAckSet.ackSetsToString() +
" ]";
break;
}
tickSkip = !tickSkip;
}
for (LearnerHandler f: getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
// leader goes in looking state
}
}
LearnHandler.run()
try {
//ia是socket的输入,oa是socket的输出
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
//等于是从输入流中读取QuorumPacket序列化对象,这个对应的是follower的registerWithLeader方法
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
//判断消息类型是不是FOLLOWERINFO和OBSERVERINFO,即上报节点信息
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());
return;
}
...
//获取从节点的周期(高32位)
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
//Q&A 2024/8/18
// Q: 小于一万的意义?
// A:
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
//将商议出来的周期和zxid发给客户端
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
//客户端响应成功
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
learnerMaster.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
//计算节点和leader相差是否很多
//三种类型:
//DIFF:差距很小
//SNAP:差距过大,需要将日志文件直接发给learner
//TRUNC: Learner的事务id比leader的还要大,因此Learner要截断事务
boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
try {
//对于需要同步日志文件的,先发一个通知
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush();
// Dump data to peer
//直接将日志dump发给follow
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
}
...
// Start thread that blast packets in the queue to learner
//单独一个线程专门同步
startSendingPackets();
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
//收到Learner的ACK
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
return;
}
LOG.debug("Received NEWLEADER-ACK message from {}", sid);
//同步完了
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
...
/*
* Wait until learnerMaster starts up
*/
learnerMaster.waitForStartup();
//leader向follower发送UPTODATE,表明follower可以开始响应客户端了
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
//接受follower消息,主要是两类吧,一类是request,一类是ack
while (true) {
自己看吧, 没啥特殊的
}
}
Follower.followLeader()
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
try {
//建立连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
//告诉leader自己的信息,并等待协商好的最新事务Id返回
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " +
ZxidUtils.zxidToString(newEpochZxid) +
" is less than our accepted epoch " +
ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
long startTime = Time.currentElapsedTime();
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
//同步数据
syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
if (self.getObserverMasterPort() > 0) {
LOG.info("Starting ObserverMaster");
om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();
} else {
om = null;
}
// create a reusable packet to reduce gc impact
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
消息处理流程
RequestProcessor
相比于单机版来说,集群的Processor显得有些多,但是像是PrepRequestProcessor、SyncRequestProcessor和FinnalProcessor都是复用,逻辑和zookeeper源码-单机版一样,这里就不赘述了。
LeaderRequestProcessor只是做了一个checkUpgradeSession,处理本地会话创建临时节点的情况。
PropsoalRequestProcessor 的工作就是将请求以提案的方法发给各个节点,并且本地落库
CommitProcessor 用于处理本地事务和总事务的匹配处理,这个类的代码是我觉得最难理解的
ToBeAppliedRequestProcessor 好像只是一个队列排序,没感知到实际作用
AckRequestProcessor 用于判断提案是否通过,通过的话给其他节点发送commit消息,并修改leader的LastZxid
FollowerRequestProcessor 的作用是 判断是否需要是只读请求,只读的话自己交给下一个Processor处理,如果是写事务的话则需要像Leader发request
SendAckRequestProcessor 是在收到leader的提案执行了SyncRequestProcessor(本地落库)之后调用的,用于回复Leader提案ACK
流程图
CommitProcessor
通信流程中其他部分的代码都比较好理解,只有CommitProcessor的run方法,我个人看了好几遍才懂,所以这里贴一下代码分析。
CommitProcessor持有四个队列:
queuedRequests 存储所有请求,无论读写,这个队列的作用主要是规定了顺序。
pendingRequests 表示正在等待处理的请求,是queuedRequests以sessionId分组后的结果,实际存储的请求可能比queuedRequests少,因为queuedRequests中有部分读请求是可以立即处理的。
queuedWriteRequests 只存储事务请求
committedRequests 表示已经提交的事务请求,在调用Commit之后才会添加进来。
这个类的执行逻辑是:
在processRequest中将所有请求添加到queuedRequests,事务请求添加到queuedRequests中
当queuedRequests或者committedRequests任意不为空的时候,执行run里的循环逻辑
如果是queuedRequests不为空,顺序出队;
如果出队的是事务请求,添加到pendingRequests中
如果出队的是只读请求,且正在处理的请求中不包含这个只读请求的SessionId,立即交由下一个Processor处理
如果出队的是只读请求,但是这个session之前有写请求,那么这个只读请求也得添加到pendingRequests中,强制串行化
等待commitIsWaiting为真,即存在已经提交的事务请求不为空
这里要理解一下,目前还为返回结果的请求只有两种,一种是未提交的事务请求,一种是阻塞在事务请求后的同session只读请求,因此想要处理这两种中的任意一个,都需要等待队首的事务提交
当commitIsWaiting为真之后,下面其实有两段逻辑:while循环是处理事务请求的,for循环是处理阻塞的只读请求的
从committedRequests取出已提交的请求,并从queuedWriteRequests中匹配,组装完整的request交给下一个Processor处理;并将当前请求的SessionId添加到queuesToDrain,表示里面阻塞的只读请求可以开始处理了
遍历可以处理的session,从pendingRequests中取出只读请求并进行处理,当遇到事务请求时停止
public void run() {
int requestsToProcess = 0;
boolean commitIsWaiting = false;
do {
//queuedRequests表示还没有commit的事务,主要用于排队
//committedRequests表示已经commit的事务
synchronized(this) {
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
//两个队列都没有表示现在没有请求需要处理
if (requestsToProcess == 0 && !commitIsWaiting) {
// Waiting for requests to process
while (!stopped && requestsToProcess == 0 && !commitIsWaiting) {
wait();
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
}
}
}
Request request;
int readsProcessed = 0;
while (!stopped &&
requestsToProcess > 0 &&
(maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize) &&
(request = queuedRequests.poll()) != null) {
requestsToProcess--;
//如果是写事务,或者虽然是读事务,但是该sessionId存在正在处理的请求都要放到队列里
if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) {
// Add request to pending
Deque < Request > requests = pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque < > ());
requests.addLast(request);
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
} else {
readsProcessed++;
numReadQueuedRequests.decrementAndGet();
//是读事务,直接交给Final处理
sendToNextProcessor(request);
}
if (!commitIsWaiting) {
commitIsWaiting = !committedRequests.isEmpty();
}
/*
*
* 存在已提交的事务:意味着在pendingRequests和committedRequests都有request
*/
if (commitIsWaiting && !stopped) {
/*
* Drain outstanding reads
*/
//Q&A 2024/8/19
// Q: 这里不理解,必须得是空了才能处理吗?
// A:
waitForEmptyPool();
if (stopped) {
return;
}
int commitsToProcess = maxCommitBatchSize;
Set < Long > queuesToDrain = new HashSet < > ();
long startWriteTime = Time.currentElapsedTime();
int commitsProcessed = 0;
//处理事务请求
while (commitIsWaiting && !stopped && commitsToProcess > 0) {
// Process committed head
request = committedRequests.peek();
if (!queuedWriteRequests.isEmpty() &&
queuedWriteRequests.peek().sessionId == request.sessionId &&
queuedWriteRequests.peek().cxid == request.cxid) {
Deque < Request > sessionQueue = pendingRequests.get(request.sessionId);
ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
//想象不出来什么时候会是这种情况
if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
break;
} else {
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
topPending.setHdr(request.getHdr());
topPending.setTxn(request.getTxn());
topPending.setTxnDigest(request.getTxnDigest());
topPending.zxid = request.zxid;
topPending.commitRecvTime = request.commitRecvTime;
request = topPending;
if (request.isThrottled()) {
LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
numWriteQueuedRequests.decrementAndGet();
//queuedWriteRequests出队
queuedWriteRequests.poll();
//由于处理了该session的写事务,因此可以处理该session的只读请求了
queuesToDrain.add(request.sessionId);
}
}
//出队
committedRequests.remove();
commitsToProcess--;
commitsProcessed++;
// Process the write inline.
//交由下一个Processor处理
processWrite(request);
commitIsWaiting = !committedRequests.isEmpty();
}
readsProcessed = 0;
for (Long sessionId: queuesToDrain) {
Deque < Request > sessionQueue = pendingRequests.get(sessionId);
int readsAfterWrite = 0;
//取出对应session的待处理请求,只处理只读请求,当遇到事务请求就停止(得等待该事务被提交后处理)
while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) {
numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(sessionQueue.poll());
readsAfterWrite++;
}
readsProcessed += readsAfterWrite;
// Remove empty queues
if (sessionQueue.isEmpty()) {
pendingRequests.remove(sessionId);
}
}
}
}
while (!stoppedMainLoop);
}
}
参考文档:
【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程 - 掘金