Apache ZooKeeper 是一个开源的分布式协调服务,用于分布式系统中维护配置信息、命名、提供分布式同步和组服务。下面根据您的要求,详细说明 ZooKeeper 的产生原因、最初使用、最新的使用、不同版本的更新点、实现原理、部署和使用。内容基于官方文档和可靠来源整理,并包含 Java 代码片段示例(ZooKeeper 主要使用 Java API)。
1. 产生原因(Creation Reason)
ZooKeeper 的产生是为了解决分布式系统中协调的复杂问题。在大规模分布式系统中,多个节点需要协调配置、状态同步、领导者选举等任务,但自行实现这些功能容易引入错误,如竞争条件(race conditions)和死锁(deadlock)。ZooKeeper 受 Google 的 Chubby lock service 启发,由 Yahoo! Research 团队开发,用于简化这些协调任务。它提供了一个可靠的、高性能的协调内核,让应用程序开发者专注于业务逻辑,而非重新发明分布式协调机制。最初是为了管理 Yahoo! 的大数据集群而创建,将状态存储在本地日志文件中,确保高可用性和一致性。
2. 最初的使用(Initial Use)
ZooKeeper 最初在 Yahoo! 内部用于工业级应用,包括:
- Yahoo! Message Broker:协调和故障恢复服务,用于管理数千个主题的可扩展发布-订阅系统。
- Yahoo! Crawler 的 Fetching Service:用于故障恢复,确保爬虫任务的可靠执行。
- Yahoo! 广告系统:提供可靠的服务协调,如命名服务、配置管理和数据同步。 典型早期用例包括命名服务(类似 DNS)、配置管理(集中存储配置)、数据同步(锁机制)、领导者选举(选主)和消息队列。ZooKeeper 被设计为读主导型(读写比约 10:1),适用于运行在数千台机器上的分布式环境。
3. 最新的使用(Latest Use)
如今,ZooKeeper 广泛用于大数据和分布式系统中,作为协调内核。常见用例包括:
- 配置管理:集中存储和更新分布式应用的配置(如 Apache Kafka 用于存储消费者偏移量,直到 4.0 版本)。
- 领导者选举:在集群中选举主节点(如 Apache HBase 用于区域分配和主故障转移)。
- 分布式锁:实现互斥访问(如 Apache Accumulo 用于无单点故障架构)。
- 组成员管理:跟踪节点加入/离开(如 Apache Druid 用于集群状态管理)。
- 其他:用于 Apache Hadoop、HDFS、Solr、Kafka(早期版本)、Pulsar 等。最新趋势包括减少对 ZooKeeper 的依赖(如 Pulsar 通过 PIP-45 引入可插拔元数据框架,允许无 ZooKeeper 运行),但在传统系统中仍不可或缺。现代应用强调其在云环境中的高可用性,如在 Kubernetes 中协调微服务。
4. 不同版本的更新点(Update Points of Different Versions)
ZooKeeper 的版本演进聚焦于性能、安全、兼容性和新功能。以下表格总结从 3.4.x 开始的主要版本更新(基于官方发布笔记,当前稳定版 3.8.x,当前版 3.9.x)。EoL(End-of-Life)版本不再接收社区支持。
| 版本系列 | 首次发布日期 | EoL 日期 | 主要更新点 |
|---|---|---|---|
| 3.4.x | 2011 年 | 2016 年 | 基础稳定版;支持基本 API、复制模式;性能优化;用于 Hadoop 子项目。 |
| 3.5.x | 2019 年 5 月 (3.5.5 作为稳定版) | 2022 年 6 月 | 添加动态重配置、本地会话、容器/TTL 节点、SSL 支持原子广播协议、可移除监视器、多线程提交处理器、升级 Netty 4.1、Maven 构建;最小 JDK 1.8;修复 CVE 和兼容性问题。 |
| 3.6.x | 2020 年 3 月 | 2022 年 12 月 | 性能和安全改进;新 API(如永久递归监视);移除 Log4j1,使用 reload4j;修复 CVE、快照和 SASL 问题;客户端兼容 3.5.x 服务器。 |
| 3.7.x | 2021 年 3 月 | 2024 年 2 月 | 新 API(如启动服务器、whoami);配额强制;主机名规范化;BCKFS 密钥/信任存储;必选认证方案;多 SASL superUsers;快速跟踪节流请求;安全指标;C/Perl SASL 支持;zkSnapshotComparer 工具;YCSB 基准测试说明;修复 64+ 个问题,包括 CVE。 |
| 3.8.x (当前稳定) | 2022 年 3 月 | - | 日志框架迁移到 LogBack;从文件读取密钥/信任存储密码;恢复 OSGI 支持;减少 Prometheus 指标性能影响;JDK17 支持;第三方依赖更新修复所有 CVE;修复同步、C 客户端测试等问题。 |
| 3.9.x (当前) | 2023 年 8 月 | - | 管理员服务器 API(快照和数据流出);通信 Zxid 触发 WatchEvent;TLS 动态加载客户端信任/密钥存储;Netty-TcNative OpenSSL 支持;SSL 支持 Zktreeutil;改进 syncRequestProcessor 性能;第三方依赖更新修复 CVE。 |
兼容性:3.5.x+ 客户端兼容 3.9.x 服务器;3.9.x 客户端兼容 3.5-3.8.x 服务器(不使用新 API)。
5. 实现的原理(Implementation Principles)
ZooKeeper 的核心是提供一个简单、高可靠的分布式协调服务。其架构基于客户端-服务器模型,使用 ZAB(ZooKeeper Atomic Broadcast)协议(类似于 Paxos)实现一致性。
- 架构组件:
- Ensemble(集群):由奇数个服务器组成(最小 3 个),确保多数派(quorum)可用。包括 Leader(领导者,处理写操作)、Follower(追随者,处理读操作并转发写到 Leader)和 Observer(观察者,只处理读,不参与选举/投票,提高读性能)。
- 数据模型:分层命名空间,像文件系统(znodes:节点,可存储数据和子节点)。支持持久节点(persistent)和临时节点(ephemeral,会话结束删除)。数据在内存中存储(高性能),并持久化到日志和快照。
- 一致性保证:顺序一致性(更新按发送顺序应用)、原子性(更新全成功或全失败)、单一系统映像(客户端无论连接哪个服务器,看到相同视图)、可靠性(更新持久化)、及时性(视图在界限内更新)。
- 工作流程:客户端连接任意服务器。写请求转发到 Leader,通过 ZAB 广播到 Follower(需多数同意)。读请求本地处理。使用监视(watches)通知变化(一次性触发,新版支持永久递归监视)。会话(sessions)通过心跳维护,断连自动重连。
- 领导者选举:使用快速 Paxos 变体,崩溃时快速选举新 Leader(<200ms)。
- 性能原理:内存镜像 + 事务日志;读主导优化;原子消息协议防止副本分歧。
ZooKeeper 适用于读多写少场景,提供简单 API(如 create、delete、get、set)。
6. 使用 ZooKeeper 的集群部署 Java 服务示例:实现领导者选举
在分布式系统中,ZooKeeper(ZK)常用于协调集群中的节点,确保高可用性和一致性。下面我给出一个详细的例子:一个简单的 分布式任务调度服务,部署在集群中,使用 ZooKeeper 实现 领导者选举(Leader Election)。这个服务模拟一个定时任务(如数据备份),但只有一个节点(Leader)执行任务,其他节点(Follower)待命。如果 Leader 宕机,Follower 会自动选举新 Leader。
一个 3 节点集群的分布式任务调度服务
- 3 台服务器(物理机或虚拟机):node1、node2、node3
- 每个节点运行一个相同的 Java JAR 包
- 使用 ZooKeeper 实现领导者选举:只有一个节点成为 Leader 执行定时任务,其他节点作为 Follower 待命
- Leader 宕机后,自动快速选举新 Leader
环境准备(3 台服务器)
| 主机名 | IP | 角色 | 说明 |
|---|---|---|---|
| node1 | 192.168.1.101 | ZooKeeper + Java 服务 | ZooKeeper myid=1 |
| node2 | 192.168.1.102 | ZooKeeper + Java 服务 | ZooKeeper myid=2 |
| node3 | 192.168.1.103 | ZooKeeper + Java 服务 | ZooKeeper myid=3 |
所有节点安装 Java
sudo apt update sudo apt install openjdk-11-jdk # Ubuntu/Debian# 或 CentOS sudo yum install java-11-openjdk-devel
所有节点安装 ZooKeeper 集群
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz sudo mv apache-zookeeper-3.8.4-bin /opt/zookeeper cd /opt/zookeeper
配置 conf/zoo.cfg(所有节点都相同)
cp conf/zoo_sample.cfg conf/zoo.cfg
vi conf/zoo.cfg
内容如下
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/var/lib/zookeeper clientPort=2181 # 集群服务器列表 server.1=192.168.1.101:2888:3888 server.2=192.168.1.102:2888:3888 server.3=192.168.1.103:2888:3888
创建数据目录并设置 myid(每个节点不同):
sudo mkdir -p /var/lib/zookeeper
sudo chown -R $USER:$USER /var/lib/zookeeper
node1:
echo "1" > /var/lib/zookeeper/myid
node2:
echo "2" > /var/lib/zookeeper/myid
node3:
echo "3" > /var/lib/zookeeper/myid
启动 ZooKeeper(所有节点):
/opt/zookeeper/bin/zkServer.sh start
验证集群状态:
/opt/zookeeper/bin/zkServer.sh status
应该看到一个 Leader 和两个 Follower。
JAVA代码
LeaderElection.java(领导者选举核心),处理连接 ZooKeeper、创建节点、监视变化和选举逻辑。
package com.example;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class LeaderElection implements Watcher {private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);private ZooKeeper zk;private String zkConnectString;private int sessionTimeout;private String electionPath;private String nodeId;private String currentZnodePath; // 当前节点的路径,如 /election/node-0000000001private CountDownLatch connectedLatch = new CountDownLatch(1);private TaskService taskService; // 任务服务引用public LeaderElection(String zkConnectString, int sessionTimeout, String electionPath, String nodeId, TaskService taskService) {this.zkConnectString = zkConnectString;this.sessionTimeout = sessionTimeout;this.electionPath = electionPath;this.nodeId = nodeId;this.taskService = taskService;}public void connect() throws IOException, InterruptedException {zk = new ZooKeeper(zkConnectString, sessionTimeout, this);connectedLatch.await(); // 等待连接成功 }@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedLatch.countDown();} else if (event.getType() == Event.EventType.NodeDeleted) {// 前一个节点删除,重新检查是否成为 Leadertry {checkIfLeader();} catch (KeeperException | InterruptedException e) {logger.error("Error checking leader", e);}}}public void participateInElection() throws KeeperException, InterruptedException {// 确保选举路径存在(持久节点)Stat stat = zk.exists(electionPath, false);if (stat == null) {zk.create(electionPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// 创建临时顺序节点currentZnodePath = zk.create(electionPath + "/" + nodeId + "-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);logger.info("Created znode: {}", currentZnodePath);// 检查是否是 Leader checkIfLeader();}private void checkIfLeader() throws KeeperException, InterruptedException {// 获取所有子节点,按序号排序List<String> children = zk.getChildren(electionPath, false);Collections.sort(children);// 当前节点是序号最小的,就是 LeaderString smallestChild = children.get(0);if (currentZnodePath.endsWith(smallestChild)) {logger.info("I am the Leader: {}", currentZnodePath);taskService.startTask(); // 开始执行任务} else {// 监视前一个节点int myIndex = children.indexOf(currentZnodePath.substring(electionPath.length() + 1));String previousChild = children.get(myIndex - 1);zk.exists(electionPath + "/" + previousChild, this); // 设置监视logger.info("I am Follower, watching: {}", previousChild);taskService.stopTask(); // 停止任务(如果之前是 Leader) }}public void close() throws InterruptedException {zk.close();} }
TaskService.java(任务执行服务),模拟一个定时任务。只有 Leader 执行。
package com.example;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;public class TaskService {private static final Logger logger = LoggerFactory.getLogger(TaskService.class);private ScheduledExecutorService executor;private boolean isRunning = false;public void startTask() {if (!isRunning) {executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {logger.info("Executing task: Backup database..."); // 模拟任务}, 0, 60, TimeUnit.SECONDS); // 每分钟执行isRunning = true;}}public void stopTask() {if (isRunning && executor != null) {executor.shutdown();isRunning = false;logger.info("Stopped task");}} }
App.java(主入口)
package com.example;import java.io.IOException; import java.util.Properties;public class App {public static void main(String[] args) throws IOException, InterruptedException, Exception {// 加载配置(实际可使用 Spring 或环境变量)Properties props = new Properties();props.load(App.class.getClassLoader().getResourceAsStream("application.properties"));String zkConnect = props.getProperty("zk.connectString");int sessionTimeout = Integer.parseInt(props.getProperty("zk.sessionTimeout"));String electionPath = props.getProperty("election.path");String nodeId = props.getProperty("node.id"); // 每个实例不同 TaskService taskService = new TaskService();LeaderElection election = new LeaderElection(zkConnect, sessionTimeout, electionPath, nodeId, taskService);election.connect();election.participateInElection();// 保持运行(生产中用 Spring Boot 或 while(true)) Thread.sleep(Long.MAX_VALUE);election.close();} }
application.properties
zk.connectString=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 zk.sessionTimeout=5000 zk.connectionTimeout=3000 election.path=/election # 每个节点手动设置不同的 node.id # node1: node-1 # node2: node-2 # node3: node-3 node.id=node-1 # 启动时根据节点修改
项目大包部署到三台服务器指定目录下(/home/user)。
在每台服务器上创建启动脚本( nohup + 脚本)
cd /home/user# 创建启动脚本 start.sh(node1 示例) cat > start.sh << 'EOF' #!/bin/bash# 节点 ID(每个服务器不同) NODE_ID="node-1" # node2 改为 node-2,node3 改为 node-3nohup java -jar \-Dnode.id=${NODE_ID} \distributed-task-service-1.0-SNAPSHOT.jar \> service.log 2>&1 &echo "Started with node.id=${NODE_ID}" EOFchmod +x start.sh
每台服务器启动服务
./start.sh
日志查看
tail -f service.log
可以看到类似输出:
- 一个节点会打印:I am the Leader: /election/node-1-0000000001
- 另外两个节点:I am Follower, watching: node-?-0000000000
只有 Leader 会每分钟打印:Executing task: Backup database...
测试故障转移
查看当前 Leader(假设是 node1):
tail -f /home/user/service.log | grep "I am the Leader"
杀掉 Leader 进程(node1):
ps -ef | grep java kill -9 <pid>
观察其他节点日志:
- 几百毫秒内,其中一个 Follower 会成为新 Leader,并开始执行任务。
- 原来的 Follower 继续监视新 Leader。
总结
- 操作流程:3 台机器 → 安装 ZK → 复制 JAR → 修改 node.id → 启动脚本
- 高可用:ZooKeeper 保证领导者选举快速、可靠
- 可扩展:想加更多节点,只需复制 JAR + 修改 node.id + 启动即可