ZooKeeper 客户端主要由以下几个核心组件组成:
ZooKeeper
实例:客户端的入口ClientWatchManager
:客户端Watcher管理器HostProvider
:客户端地址列表管理器ClientCnxn
:客户端核心线程
ZooKeeper 客户端的构造方法有以下几种:
1 | public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) |
这些构造方法的最终根据是否重用 session 有两种实现:
1 | private final ZKWatchManager watchManager = new ZKWatchManager(); |
1. 一次会话的创建过程
1.1 初始化阶段
客户端的初始化过程分为以下几个步骤:
初始化 ZooKeeper 对象。
通过调用 ZooKeeper 的构造方法来实例化一个 ZooKeeper 对象,在初始化过程中,会创建一个客户端的 Watcher 管理器:
ClientWatchManager
。设置会话默认 Watcher。
如果在构造方法中传入了一个 Watcher 对象,那么客户端会将这个对象作为默认 Watcher 保存在
ClientWatchManager
中。构造 ZooKeeper 服务器地址列表管理器:
HostProvider
。对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器
HostProvider
中。创建并初始化客户端网络连接器:
ClientCnxn
。ZooKeeper 客户端首先会创建一个网络连接器
ClientCnxn
,用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn
的同时,还会初始化两个核心队列outgoingQueue
和pendingQueue
,分别作为客户端请求的发送队列和服务端响应的等待队列。ClientCnxn
的构造和启动方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
//启动 SendThread 和 EventThread
public void start() {
sendThread.start();
eventThread.start();
}初始化
SendThread
和EventThread
。ClientCnxn
还会创建两个核心网络线程SendThread
和EventThread
,前者用于管理客户端和服务端之间的所有网络 I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket
分配给SendThread
作为底层网络 I/O 处理器,并初始化EventThread
的待处理事件队列waitingEvents
,用于存放所有等待被客户端处理的事件。SendThread
:1
2
3
4
5
6
7SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
//设置当前状态为 CONNECTING
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}EventThread
:1
2
3
4
5private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
1.2 会话创建阶段
启动
SendThread
和EventThread
。SendThread
首先会判断当前客户端的状态,进行一系列请理性工作。获取一个服务器地址。
在开始创建 TCP之前,
SendThread
首先需要获取一个 ZooKeeper 服务器的目标地址,这通常是从HostProvider
中随机获取出一个地址,然后委托给ClientCnxnSocket
去创建与 ZooKeeper 服务器之间的 TCP 连接。创建TCP连接。
获取一个服务器地址后,
ClientCnxnSocket
负责和服务器创建一个 TCP 长连接。构造
ConnectRequest
请求。以上步骤后,
ClientCnxnSocket
和服务器之间创建了一个 TCP 长连接,但和 ZooKeeper 服务器之间的会话创建尚未完成。ClientCnxnSocket
会进一步调用SendThread
的primeConnection()
方法,构造一个ConnectRequest
请求。该请求代表了客户端试图和服务端之间创建一个会话。同时,将该请求包装成Packet
对象,放入请求发送队列outgoingQueue
中去。发送请求。
ClientCnxnSocket
负责从outgoingQueue
中取出一个待发送的Packet
对象,将其序列化成ByteBuffer
后,向服务端进行发送。
1.3 响应处理阶段
接受服务器端响应。
ClientCnxnSocket
接受到服务端响应后,会首先判断当前的客户端状态是否是 “已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult
方法来处理该响应。处理 Response。
ClientCnxnSocket
会对接受到的服务端响应进行反序列化,得到ConnectResponse
对象,并从中获取到 ZooKeeper 服务端分配的会话sessionId
。连接成功。
连接成功后,一方面需要通知
SendThread
线程,进一步对客户端进行会话参数的设置,包括readTimeout
和connectTimeout
等,并更新客户端状态,另一方面,需要通知地址管理器HostProvider
当前成功连接的服务器地址。生成事件:
SyncConnected-None
。为了能够让上层应用感知到会话的成功创建,
SendThread
会生成一个事件SyncConnected-None
,代表客户端与服务器会话创建成功,并将该事件传递给EventThread
线程。查询
Watcher
。EventThread
线程收到事件后,会从ClientWatchManager
管理器中查询出对应的Watcher
,针对SyncConnected-None
事件,那么就直接找出存储的默认Watcher
,然后将其放到EventThread
的watingEvents
队列中去。处理事件。
EventThread
不断的从watingEvents
队列中取出待处理的Watcher
对象,然后直接调用该对象的process
接口方法,以达到触发Watcher
的目的。
2. 服务器地址列表
2.1 Chroot:客户端隔离命名空间
在 3.2.0 之后版本的 ZooKeeper 中,添加了 “Chroot” 特性,该特性允许每个客户端为自己设置一个命名空间。如果一个 ZooKeeper 客户端设置了 Chroot
,那么该客户端对服务器的任何操作,都将会被限制在自己的命名空间下。
客户端可以通过在 connectString
中添加后缀的方式来设置 Chroot
,如下所示:
1 | 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/apps/X |
将这样一个 connectString
传入客户端的 ConnectStringParser
后就能够解析出 Chroot
并保存在 chrootPath
属性中。
2.2 HostProvider:地址列表管理器
HostProvider
的默认实现是 StaticHostProvider
。通过调用 staticHostProvider
的 next()
方法,能够从 StaticHostProvider
中获取一个可用的服务器地址。这个 next()
方法并非简单地从 serverAddresses
中一次获取一个服务器地址,而是先将随机打散后的服务器地址列表拼装成一个环形的循环队列。注意这个随机过程是一次性的,也就是说,之后的使用过程中一直是按照这样的顺利来获取服务器地址的。
3. ClientCnxn:网络 I/O
3.1 Packet
Packet
是 ClientCnxn
内部定义的一个堆协议层的封装,用作 ZooKeeper 中请求和响应的载体。Packet
包含了请求头(requestHeader
)、响应头(replyHeader
)、请求体(request
)、响应体(response
)、节点路径(clientPath/serverPath
)、注册的 Watcher
(watchRegistration
)等信息,然而,并非 Packet
中所有的属性都在客户端与服务端之间进行网络传输,只会将 requestHeader
、request
、readOnly
三个属性序列化,并生成可用于底层网络传输的 ByteBuffer
,其他属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。
org.apache.zookeeper.ClientCnxn.Packet#createBB
1 | public void createBB() { |
3.2 outgoingQueue 和 pendingQueue
ClientCnxn
维护着 outgoingQueue
(客户端的请求发送队列)和 pendingQueue
(服务端响应的等待队列),outgoingQueue
专门用于存储那些需要发送到服务端的 Packet
集合,pendingQueue
用于存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的 Packet
集合。
3.3 ClientCnxnSocket:底层 Socket 通信层
在 ZooKeeper 中,ClientCnxnSocket
的默认实现是 ClientCnxnSocketNIO
,该实现类使用 Java 原生的 NIO 接口,其核心是 doIO
逻辑,主要负责对请求的发送和响应接收过程。
SendThread
线程中会循环调用 org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
1 |
|
org.apache.zookeeper.ClientCnxnSocketNIO#doIO
1 | void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) |
3.3.1 请求发送
客户端提交请求:
org.apache.zookeeper.ClientCnxn#submitRequest
1 | public ReplyHeader submitRequest(RequestHeader h, Record request, |
org.apache.zookeeper.ClientCnxn#queuePacket
1 | Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, |
在正常情况下,ClientCnxnSocket
会从 outgoingQueue
中取出一个可发送的 Packet
对象,同时生成一个客户端请求序号 XID 并将其设置到 Packet
请求头中去,然后将其序列化后进行发送。
请求发送完毕后,会立即将该 Packet
保存到 pendingQueue
中,以便等待服务端响应返回后进行相应的处理。
3.3.2 响应接收
客户端获取到来自服务端的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理。
- 如果检测到当前客户端尚未进行初始化,那么说明当前客户端与服务端之间正在进行会话创建,那么就直接将接收到的
ByteBuffer
(incomingBuffer
)序列化成ConnectResponse
对象。 - 如果当前客户端已经处于正常的会话周期,并且接收到的服务端响应是一个事件,那么客户端将接收到的
ByteBuffer
序列化成WatcherEvent
对象,并将该事件放入待处理队列中。 - 如果是一个常规的请求响应(
Create
、GetData
、Exist
等),那么会从pendingQueue
队列中取出一个Packet
来进行相应的处理。客户端首先会通过检验服务端响应中的 XID 来确保请求处理的顺序性,然后再将接收到的ByteBuffer
序列化成Response
对象。 - 最后,会在
finishPacket
方法中处理Watcher
注册等逻辑。
3.4 SendThread
SendThread
是客户端 ClientCnxn
内部一个核心的 I/O 调度线程,用于管理客户端和服务端之间的所有网络 I/O 操作。在 ZooKeeper 客户端的实际运行过程中,一方面,SendThread
维护了客户端与服务端之间的会话生命周期,其通过在—定的周期频率内向服务端发送一个 PING
包来实现心跳检测。同时,在会话周期内,如果客户端与服务端之间出现 TCP 连接断开的怙况,那么就会自动且透明化地完成重连操作。
另一方面,SendThread
管理了客户端所有的请求发送和响应接收操作,其将上层客户端 API 操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。同时,SendThread
还负责将来自服务端的事件传递给 EventThread
去处理。
3.5 EventThread
EventThread
是客户端 ClientCnxn
内部的另一个核心线程,负责客户端的事件处理,并触发客户端注册的 Watcher
监听。EventThread
中有一个 waitingEvents
队列,用于临时存放那些需要被触发的 Object,包括那些客户端注册的 Watcher
和异步接口中注册的回调器 AsyncCallback
。同时,EventThread
会不断地从 waitingEvents
这个队列中取出 Object,识别出具体类型(Watcher
或者 AsyncCallback
),并分别调用 process
和 processResult
接口方法来实现对事件的触发和回调。