1. 会话创建请求
ZooKeeper 服务端对于会话创建的处理,大体可以分为请求接收、会话创建、预处理、事务处理、事务应用和会话响应 6 大环节,其大体流程如图。
1.1 请求接收
1.1.1 I/O 层接收来自客户端的请求
在 ZooKeeper 中,NIOServerCnxnFactory
会在运行过程中为客户端连接创建对应的 NIOServerCnxn
实例,客户端与服务端的所有通信都是由NIOServerCnxn
负责的 —— 其负责统一接收来自客户端的所有请求,并将请求内容从底层网络 I/O 中完整地读取出来,一个客户端连接就对应了一个 NIOServerCnxn
的实例。注意刚创建时 NIOServerCnxn
实例的 initialized
字段为 false。
org.apache.zookeeper.server.NIOServerCnxnFactory#run
1 | public void run() { |
1.1.2 判断是否是客户端会话创建请求
当底层 I/O 有数据可读时,NIOServerCnxnFactory
找到绑定的 NIOServerCnxn
实例,调用其 doIO
方法。这里会做一个判断,若 initialized
字段为 false,则这一定是客户端的第一个请求会话创建请求。
org.apache.zookeeper.server.NIOServerCnxn#readPayload
1 | /** Read the request payload (everything following the length prefix) */ |
1.1.3 反序列化 ConnectRequest 请求
一旦确定客户端请求是会话创建请求,那么服务端就可以对其进行反序列化,并生成一个 ConnectRequest
请求实体。
org.apache.zookeeper.server.NIOServerCnxn#readConnectRequest
1 | private void readConnectRequest() throws IOException, InterruptedException { |
org.apache.zookeeper.server.ZooKeeperServer#processConnectRequest
1 | public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { |
1.1.4 判断是否 ReadOnly 客户端
如果当前 ZooKeeper 客户端是以 ReadOnly 模式启动的,那么所有来自非 ReadOnly 客户端的请求将无法被处理。因此,服务端需要先检查其是否是 ReadOnly 客户端,并以此来决定是否接受该会话创建请求。
1.1.5 检查客户端 ZXID
在正常情况下,同一个 ZooKeeper 集群中,服务端的 ZXID 必定大于客户端的 ZXID,因此如果发现客户端的 ZXID 大于服务端的 ZXID,那么服务端不接受该客户端的会话创建请求。
1.1.6 协商 sessionTimeout
客户端在构造 ZooKeeper 实例时,会有一个 sessionTimeout
参数用于指定会话的超时时间。客户端向服务器发送这个超时时间后,服务器会根据自己的超时时间限制最终确定该会话的超时时间。
默认情况下,ZooKeeper 服务器对超时时间的限制介于 2 个 tickTime
到 20 个 tickTime
之间。
1.1.7 判断是否需要重新创建会话
服务端根据客户端请求中是否包含 sessionID
来判断该客户端是否需要重新创建会话。如果客户端请求中已经包含了 sessionID
,那么就认为该客户端正在进行会话重连。这种情况下,服务端只需要重新打开这个会话,否则需要重新创建。
1.2 会话创建
org.apache.zookeeper.server.ZooKeeperServer#createSession
1 | long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { |
org.apache.zookeeper.server.SessionTrackerImpl#createSession
1 | synchronized public long createSession(int sessionTimeout) { |
1.2.1 为客户端生成 sessionId
在为客户端创建会话之前,服务端首先会为每个客户端都分配一个 sessionId
。 分配方式是通过 SessionTracker
对基准 sessionId
做自增操作。无论客户端连的是哪台服务器,生成的 sessionId
都是全局唯一的。
1.2.2 注册会话
在会话创建初期,会将客户端会话的相关信息保存到 SessionTracker
的 sessionWithTimeout
和 sessionById
中。
org.apache.zookeeper.server.SessionTrackerImpl#addSession
1 | synchronized public void addSession(long id, int sessionTimeout) { |
1.2.3 激活会话
为会话安排一个区块,方便会话清理程序能够快速高效地进行会话清理。
org.apache.zookeeper.server.SessionTrackerImpl#touchSession
1 | synchronized public boolean touchSession(long sessionId, int timeout) { |
1.2.4 生成会话密钥
服务端在创建一个客户端会话时,会同时为客户端生成一个会话密码,连同 sessionId
一起发送给客户端,作为会话在集群中不同机器间转移的凭证。
1.2.5 将请求交给 firstProcessor
org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.ServerCnxn, long, int, int, java.nio.ByteBuffer, java.util.List<org.apache.zookeeper.data.Id>)
1 | private void submitRequest(ServerCnxn cnxn, long sessionId, int type, |
这里的 type
为 createSession
。
org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)
1 | public void submitRequest(Request si) { |
firstProcessor
是一个 RequestProcessor
类型的变量。在提交给 firstProcessor
处理器之前,Zookeeper 会根据该请求所属的会话,进行一次激活会话操作,以确保当前会话处于激活状态,完成会话激活后,则提交请求至 firstProcessor
处理器,放入待处理请求队列中。
到这里 createSession
方法结束,后续流程由 firstProcessor
线程异步处理。
在会话创建请求的处理中,无论客户端连接的是 Leader 还是 Learner,到目前为止的处理流程都是相同的。接下来的差别在于:
- 对于 Leader 服务器,其
firstProcessor
的实现为PrepRequestProcessor
。 - 对于 Follower 服务器,其
firstProcessor
的实现为FollowerRequestProcessor
。 - 对于 Observer 服务器,其
firstProcessor
的实现为ObserverRequestProcessor
。
FollowerRequestProcessor
和 ObserverRequestProcessor
会将事务请求以 REQUEST 消息的形式转发给 Leader 处理。Leader 的 LearnerHandler
在接收到这个消息后,会解析出客户端的原始请求,然后提交到自己的请求处理链中开始进行事务请求的处理。
1.3 事务预处理
1.3.1 异步处理请求
org.apache.zookeeper.server.PrepRequestProcessor#run
1 |
|
org.apache.zookeeper.server.PrepRequestProcessor#pRequest
方法根据请求的类型,将事务类请求交由 org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn
方法处理。对一些类型的事务请求,还要生成变更记录放入 outstandingChanges
队列中。
1 | // ... |
1.3.2 创建请求事务头
对于事务请求,ZooKeeper 首先会为其创建请求事务头。请求事务头包含了一个事务请求最基本的一些信息,包括 sessionId
、ZXID、CXID(客户端的操作序列号) 和请求类型等。
1 | public class TxnHeader implements Record { |
1.3.3 创建请求事务体
对于事务请求,ZooKeeper 还会为其创建请求事务体。对应到会话创建请求,对应的事务体实现为 CreateSessionTxn
。
1.3.4 注册与激活会话
此处进行会话注册与激活的目的是处理由非 Leader 服务器转发过来的会话创建请求,在这种情况下,其尚未在 Leader 的 SessionTracker
中进行会话的注册,因此需要在此处进行一次注册与激活。
1.4 事务处理
在 pRequest
方法最后,会将请求提交给 RequestProcessor
类型变量 nextProcessor
处理。对于 Leader,这个变量的实现类为 ProposalRequestProcessor
。
ProposalRequestProcessor
顾名思义是一个与提案相关的处理器。所谓的提案,是 ZooKeeper 中针对事务请求所展开的一个投票流程中对事务操作的包装。从 ProposalRequestProcessor
处理器开始,请求的处理将会同时进入三个子处理流程,分别是 Sync 流程、Proposal 流程和 Commit 流程。
org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
1 | public void processRequest(Request request) throws RequestProcessorException { |
对 Leader 而言:
ProposalRequestProcessor
会首先将请求提交给nextProcessor
,其具体实现是CommitProcessor
。请求被放入CommitProcessor
的 队列queuedRequests
,等待CommitProcessor
的线程异步处理(即等待投票完成),此即 Commit 流程。- 调用 Leader 的
propose
方法,生成Proposal
并广播给 Follower,统计 Follower 返回的投票结果并通知各个 Learner 最终提交事务,此即 Proposal 流程。这个流程会在完成后唤醒 Commit 流程。 - 由
SyncRequestProcessor
进行事务日志的记录,并调用AckRequestProcessor
处理 Leader 自己的投票,此即 Sync 流程。这个流程会流向 Proposal 流程。
当 Leader 对非事务请求的处理流程到达此处时,由于不包含请求事务头,因此仅仅只是把请求提交给 CommitProcessor
。
1.4.1 Proposal 流程
org.apache.zookeeper.server.quorum.Leader#propose
1 | /** |
1.4.1.1 发起投票
如果当前请求是事务请求,那么 Leader 服务器就会发起一轮事务投票。在发起事务投票之前,会首先检查当前服务器的 ZXID 是否可用。
1.4.1.2 生成提案 Proposal
若 ZXID 可用,ZooKeeper 会将之前创建的请求头和事务体,以及 ZXID 和请求本身序列化到 Proposal
对象中 —— 此 Proposal
对象就是一个提案,即针对 ZooKeeper 服务器状态的一次变更申请。
1.4.1.3 广播提案
更新 lastProposed
,以 ZXID 作为 key 将该提案放入投票箱 outstandingProposals
中,同时将该提案广播给所有 Follower。
org.apache.zookeeper.server.quorum.Leader#sendPacket
1 | void sendPacket(QuorumPacket qp) { |
1.4.1.4 Follower 接收提案(Follower Sync 流程)
Follower 启动后,会通过 followLeader
方法不断从与 Leader 之间的连接中读取数据并作相应处理。
org.apache.zookeeper.server.quorum.Follower#processPacket
1 | /** |
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#logRequest
1 | public void logRequest(TxnHeader hdr, Record txn) { |
到这里,Follower 将这个事务记录到 pendingTxns
中,并将事务请求提交给 syncProcessor
作异步处理,在 Follower 的 Sync 流程中对提案做响应并向 Leader 提交 ACK 信息。
1.4.1.5 Leader 统计投票
Leader 的 LearnerHandler
会接收来自各个 Follower 的 ACK 信息,并调用 Leader 的 org.apache.zookeeper.server.quorum.Leader#processAck
对投票做处理。
1 | /** |
当提案获得了集群中过半 PARTICIPANT 的投票,那么就认为该提案通过。
1.4.1.6 处理通过的提案
- 将提案的 ZXID 从
outstandingProposals
中移除。 - 将提案添加到
toBeApplied
队列。 - 向所有 Follower 发送
COMMIT
消息。由于 Follower 已经保存了所有关于该提案的信息,这里只需向其发送 ZXID 即可。 - 向所有 Observer 发送
INFORM
消息。由于 Observer 并未参与之前的投票阶段,因此 Observer 服务器并未保存任何关于该提案的信息。INFORM
消息中会包含当前提案的内容。 - 向
CommitProcessor
提交这个被通过的事务,进入 Leader 的 Commit 流程。
1.4.2 Sync 流程
Leader 在生成事务提案和 Follower 接收到事务提案时,都会将提案放入 SyncRequestProcessor
的提案队列 queuedRequests
,等待 SyncRequestProcessor
线程异步处理。 SyncRequestProcessor
处理器会记录事务日志,并提交给 nextProcessor
做后续处理。但是,Leader 和 Follower 的 SyncRequestProcessor
具有不同的 nextProcessor
实现。
1.4.2.1 Leader 的 Sync 流程
对于 Leader,其 SyncRequestProcessor
的 nextProcessor
是 AckRequestProcessor
。由于 Leader 自己也需要对事务进行投票,AckRequestProcessor
会用事务请求本身作为 ACK,并调用 Leader 的方法处理该 ACK。因此,Leader 的 Sync 流程最终会流向 Proposal 流程。
org.apache.zookeeper.server.quorum.AckRequestProcessor#processRequest
1 | /** |
1.4.2.2 Follower 的 Sync 流程
对于 Follower,其 SyncRequestProcessor
的 nextProcessor
是 SendAckRequestProcessor
。syncProcessor
进行事务日志的记录后,由 SendAckRequestProcessor
向 Leader 回复一个 ACK 消息。
org.apache.zookeeper.server.quorum.SendAckRequestProcessor#processRequest
1 | public void processRequest(Request si) { |
1.4.2.3 Observer 的 Sync 流程
虽然 Observer 会初始化 SyncRequestProcessor
,但由于 Leader 不会向 Observer 转发事务提案,因此 Observer 不存在 Sync 流程。
1.4.3 Commit 流程
org.apache.zookeeper.server.quorum.CommitProcessor#run
1 |
|
1.4.3.1 Leader 的 Commit 流程
将请求交付给
CommitProcessor
处理器。如前所述,Leader 在生成提案之前,会首先将生成的提案放到
CommitProcessor
的queuedRequests
队列中。处理
queuedRequests
队列请求。CommitProcessor
会有一个单独的线程来处理queuedRequests
队列中的请求。标记
nextPending
。若从
queuedRequests
中取出的请求是一个事务请求,则需要在集群中进行投票处理,同时将nextPending
标记为当前请求。等待 Proposal 投票。
在进行 Commit 流程的同时,Leader 会生成
Proposal
并广播给所有 Follower 服务器,此时,Commit 流程等待,直到投票结束。投票通过。
若提案获得过半 PARTICIPANT 认可,那么进入请求提交阶段。Leader 会将该请求放入
commitedRequests
队列中,同时唤醒 Commit 流程。提交请求。
若
commitedRequests
队列中存在可以提交的请求,那么 Commit 流程将请求放入toProcess
队列中。在这个过程中为了保证事务请求的顺序执行,Commit 流程还会对比之前标记的nextPending
和commitedRequests
队列中的第一个请求是否一致。在下一次循环中,toProcess
队列中的请求将被取出交付下一个请求处理器。对于 Leader 而言,下一个请求处理器是ToBeAppliedRequestProcessor
。
1.4.3.2 Follower 的 Commit 流程
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#commit
1 | public void commit(long zxid) { |
Follower 在收到 COMMIT
消息时,会首先将该事务的 ZXID 与 pendingTxns
队列中缓存的事务对比,然后放入 CommitProcessor
的 committedRequests
队列。
Follower 的 CommitProcessor
将在两个队列中整理事务信息,在后续循环中提交给下一个请求处理器,即 FinalRequestProcessor
。
1.4.3.3 Observer 的 Commit 流程
org.apache.zookeeper.server.quorum.Observer#processPacket
1 | /** |
Observer 收到来自 Leader 的 INFORM
消息后的处理过程类似于 Follower。
1.5 事务应用
对于 Leader,事务由 CommitProcessor
提交给 ToBeAppliedRequestProcessor
,再由 ToBeAppliedRequestProcessor
提交给 FinalRequestProcessor
;对于 Follower 和 Observer,事务由 CommitProcessor
提交给 FinalRequestProcessor
。
有效性检查
FinalRequestProcessor
处理器检查outstandingChanges
队列中请求的有效性,如果发现这些请求已经落后于当前正在处理的请求,那么直接从outstandingChanges
队列中移除。事务应用
之前的请求处理仅仅是将事务请求记录到了事务日志中去,而内存数据库中的状态尚未变更。因此,在这个环节,需要将事务变更应用到内存数据库中。
org.apache.zookeeper.server.ZooKeeperServer#processTxn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst.getTimeOut());
} else {
LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}对于会话创建这类事务请求,需要向
SessionTracker
进行会话注册。此时,一个客户端的会话被保存到了集群中的所有服务器上(但是注意,Leader 和 Learner 的SessionTracker
具有不同实现)。将事务放入
commitProposal
队列一旦完成事务请求的内存数据库应用,就可以将该请求放入
commitProposal
队列中。commitProposal
队列用来保存最近被提交的事务请求,以便集群间机器进行数据的快速同步。
1.6 会话响应
FinalRequestProcessor
继续处理对会话请求的响应。
统计处理
ZooKeeper 计算请求在服务端处理所花费的时间,统计客户端连接的基本信息,如
lastZxid
(最新的 ZXID)、lastOp
(最后一次和服务端的操作)和lastLatency
(最后一次请求处理所花费的时间)等。创建响应
ConnectResponse
ConnectResponse
就是一个会话创建成功后的响应,包含了当前客户端与服务端之间的通信协议版本号protocolVersion
、会话超时时间、sessionId
和会话密码。
- 序列化
ConnectResponse
- I/O 层发送响应给客户端
1.7 客户端处理请求响应
对于会话创建请求,客户端会调用 org.apache.zookeeper.ClientCnxn.SendThread#onConnected
方法,生成一个 None-SyncConnected
事件,交由 EventThread
处理:
1 | void onConnected(int _negotiatedSessionTimeout, long _sessionId, |
2. SetData 请求
服务端对于 SetData 请求的处理大致可以分为 4 步,分别是请求的预处理、事务处理、事务应用和请求响应。
2.1 预处理
I/O 层接收来自客户端的请求。
判断是否是客户端会话创建请求。对于 SetData 请求,由于已经完成了会话创建,因此按照正常事务请求进行处理。
将请求交给
PrepRequestProcessor
处理器进行处理。org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// ...
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type);
switch (type) {
// ...
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest)record;
if(deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
request.authInfo);
version = setDataRequest.getVersion();
int currentVersion = nodeRecord.stat.getVersion();
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
version = currentVersion + 1;
request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
nodeRecord.stat.setVersion(version);
addChangeRecord(nodeRecord);
break;
// ...
}
// ...创建请求事务头。
会话检查。
检查该会话是否有效,即是否已经超时。
反序列化请求,并创建
ChangeRecord
记录。ZooKeeper 首先会对请求反序列化并生成特定的
SetDataRequest
请求,请求中包含了数据节点路径 path、更新的内容 data 和期望的数据节点版本 version。同时根据请求对应的 path,Zookeeper 会生成一个ChangeRecord
记录。ACL检查。检查客户端是否具有数据更新的权限。
数据版本检查。
ZooKeeper 通过 version 属性来实现乐观锁机制的写入校验。
创建请求事务体
SetDataTxn
。保存
ChangeRecord
记录到outstandingChanges
队列中。
2.2 事务处理
参见会话创建的事务处理阶段。
2.3 事务应用
交付给
FinalRequestProcessor
处理器。事务应用。
将请求事务头和事务体直接交给内存数据库
ZKDatabase
进行事务应用,同时返回ProcessTxnResult
对象,包含了数据节点内容更新后的 stat。将事务请求放入
commitProposal
队列。
2.4 请求响应
统计处理。
创建响应体
SetDataResponse
。其包含了当前数据节点的最新状态 stat。
创建响应头。
包含当前响应对应的事务 ZXID 和请求处理是否成功的标识。
序列化响应。
I/O层发送响应给客户端。
3. GetData 请求
服务端对于 GetData
请求的处理,大致分为 3 步,分别是请求的预处理、非事务处理和请求响应。
3.1 预处理
- I/O 层接收来自客户端的请求。
- 判断是否是客户端会话创建请求。
- 会话检查。
- 将请求提交给
firstProcessor
。- 对于 Leader,
PreRequestProcessor
再次检查会话,然后交给ProposalRequestProcessor
。由于这种情况下请求事务头为 null,Leader 将提交请求给CommitProcessor
并忽略 Proposal 和 Sync 阶段。 - 对于 Learner,提交请求给
CommitProcessor
。
- 对于 Leader,
3.2 非事务处理
- 反序列化
GetDataRequest
请求。 - 获取数据节点。
- ACL检查。
- 获取数据内容和 stat,注册
Watcher
。
3.3 请求响应
- 创建响应体
GetDataResponse
。响应体包含当前数据节点的内容和状态 stat。 - 创建响应头。
- 统计处理。
- 序列化响应。
- I/O层发送响应给客户端。
4. 自问自答
4.1 Learner 如何处理事务请求?
当一个 Learner 收到客户端的事务请求时,会通过 REQUEST 消息转发给 Leader。Leader 的 LearnerHandler
收到消息后,会提交给 PreRequestProcessor
,进入预处理阶段。由于该请求不是来自于与 Leader 相连的客户端的,因此相比于完整流程,跳过了前面的会话创建阶段。
4.2 在事务处理的过程中,Follower 会收到哪些消息?
如何客户端连接的是一个 Follower,整个流程中该 Follower 会收到:
- 来自客户端的事务请求。由
FollowerRequestProcessor
处理,发送 REQUEST 消息给 Leader,并添加到CommitProcessor
的queuedRequests
队列。
不论客户端是向哪台服务器提交的事务请求,所有 Follower 都会收到:
- 来自 Leader 的事务提案。由
FollowerZooKeeperServer
交给SyncRequestProcessor
处理,提交到SendAckRequestProcessor
,向 Leader 回复 ACK。 - 来自 Leader 的
COMMIT
消息。由FollowerZooKeeperServer
添加事务请求到CommitProcesser
的committedRequests
队列,并在接下来提交到FinalRequestProcessor
。
4.2 在事务处理的过程中,Observer 会收到哪些消息?
如何客户端连接的是一个 Observer,整个流程中该 Observer 会收到:
- 来自客户端的事务请求。由
ObserverRequestProcessor
处理,发送 REQUEST 消息给 Leader,并记录到CommitProcessor
的queuedRequests
队列。
不论客户端是向哪台服务器提交的事务请求,所有 Observer 都会收到:
- 来自 Leader 的
INFORM
消息。由ObserverZooKeeperServer
添加事务请求到CommitProcesser
的committedRequests
队列,并在接下来提交到FinalRequestProcessor
。
4.3 Leader 是否回复来自 Learner 的 REQUEST 消息?
Leader 不会对 Learner 的 REQUEST 消息做回复。请求处理结果由 Leader 向所有 Learner 发送确认信息(COMMIT
或 INFORM
)传达。
4.4 如何保证只由接收客户端事务请求的那台服务器来对客户端发送响应?
对于接收客户端事务请求的服务器,在流程中流转时,会创建一个
Request
对象,其cnxn
属性被设置为处理该客户端请求的NIOServerCnxn
实例。这个对象最终被添加到CommitProcessor
的queuedRequests
队列中,等待 Leader 确认事务处理结果。其他服务器不会执行这一个步骤。对于 Leader:
如果客户端请求是直接发送给 Leader 的,如前所述,Leader 会创建一个
Request
对象,其cnxn
属性被设置为处理该客户端请求的NIOServerCnxn
实例,然后调用org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)
。如果客户端请求不是直接发送给 Leader 的,那么 Leader 会收到来自某一个 Learner 的 REQUEST 请求。Leader 会创建一个
Request
对象,其cnxn
属性为 null,然后调用org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)
。org.apache.zookeeper.server.quorum.LearnerHandler#run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void run() {
// ...
while (true) {
switch (qp.getType()) {
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitRequest(si);
break;
}
}
}无论哪种情况,这个
Request
将在 Leader 的 Commit 流程中被 Leader 添加到CommitProcessor
的queuedRequests
队列中,等待集群投票结果。无论哪种情况,这个
Request
将在 Leader 的 Proposal 流程中被 Leader 添加到CommitProcessor
的committedRequests
队列中,等待事务应用。
对于 Follower:
- 在 Leader 向 Follower 提交事务提案后,也会创建一个
Request
对象,但其cnxn
属性被设置为 null,然后将其添加到FollowerZooKeeperServer
的pendingTxn
队列中。 - 在 Leader 向 Follower 正式提交事务(COMMIT)后,会从
pendingTxn
队列取出该Request
对象,放入CommitProcesser
的committedRequests
队列中。
- 在 Leader 向 Follower 提交事务提案后,也会创建一个
对于 Observer:
- 在 Leader 向 Follower 正式提交事务(INFORM)后,会创建一个
Request
对象,但其cnxn
属性被设置为 null,放入CommitProcesser
的committedRequests
队列中。
- 在 Leader 向 Follower 正式提交事务(INFORM)后,会创建一个
综上所述,对于服务器:
- 如果自己是收到客户端请求的那个服务器,那么自己的
CommitProcesser
的queuedRequests
队列中都会包含一个待提交的事务请求,其cnxn
属性为客户端连接对应的NIOServerCnxn
实例。 - 如果自己不是收到客户端请求的那个服务器,那么自己的
CommitProcesser
的committedRequests
队列中都会包含一个待提交的事务请求,其cnxn
属性为 null。
- 如果自己是收到客户端请求的那个服务器,那么自己的
在
CommitProcesser
整理请求信息的过程中,会优先考虑queuedRequests
队列中的Request
对象。因此,如果自己是收到客户端请求的那个服务器,那么提交给FinalRequestProcessor
的Requet
对象的cnxn
属性不为 null;反之则为 null。在
FinalRequestProcessor
的处理过程中,各服务器首先完成事务应用。这是将做一次判断,只有当传入的Request
对象的cnxn
参数不为 null 时,才会继续进行后续的会话响应操作。最终,集群中的所有服务器都提交并应用了事务,但只有客户端所连接的那个服务器才会对客户端进行响应。