大家好,我是小悟。
一、Zookeeper简介
1.1 什么是Zookeeper
Zookeeper是一个分布式的、开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现。它提供了一组简单的原语,使得分布式应用能够基于这些原语实现更高层次的服务,如:分布式锁、配置管理、命名服务、集群管理等。
1.2 Zookeeper的核心特性
- 顺序一致性:客户端发起的更新请求,会按照其发出的顺序被应用到Zookeeper
- 原子性:更新操作要么全部成功,要么全部失败
- 单一系统映像:无论客户端连接到哪个服务器,都会看到相同的服务视图
- 可靠性:一旦更新被应用,就会一直保持,直到被覆盖
- 及时性:保证客户端在特定时间范围内获得最新的数据
1.3 Zookeeper的数据结构
Zookeeper的数据模型类似于Unix文件系统,采用层次化的树形结构,每个节点称为znode,可以存储数据和子节点。
二、详细实现步骤
2.1 环境准备
2.1.1 安装Zookeeper
# 下载Zookeeper(以3.7.0为例) wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz # 解压 tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz cd apache-zookeeper-3.7.0-bin # 创建数据目录和日志目录 mkdir data mkdir logs # 复制配置文件 cp conf/zoo_sample.cfg conf/zoo.cfg # 修改配置 vi conf/zoo.cfg配置内容:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/path/to/your/zookeeper/data dataLogDir=/path/to/your/zookeeper/logs clientPort=2181 # 如果是集群,添加如下配置 # server.1=node1:2888:3888 # server.2=node2:2888:3888 # server.3=node3:2888:38882.1.2 启动Zookeeper
# 启动Zookeeper bin/zkServer.sh start # 查看状态 bin/zkServer.sh status # 客户端连接 bin/zkCli.sh -server 127.0.0.1:21812.2 SpringBoot项目搭建
2.2.1 创建SpringBoot项目
使用Spring Initializr创建项目,添加以下依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.0</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>springboot-zookeeper-demo</artifactId> <version>1.0.0</version> <properties> <java.version>11</java.version> </properties> <dependencies> <!-- Spring Boot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Zookeeper Curator --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency> <!-- Zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- 配置处理器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- 测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>2.3 Zookeeper配置类
2.3.1 配置文件
# application.yml server: port: 8080 spring: application: name: zookeeper-demo zookeeper: connect-string: localhost:2181 session-timeout: 5000 connection-timeout: 5000 base-sleep-time: 1000 max-retries: 3 max-sleep-time: 5000 namespace: springboot2.3.2 配置属性类
package com.example.zookeeper.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "zookeeper") public class ZookeeperProperties { /** * 连接字符串 */ private String connectString = "localhost:2181"; /** * 会话超时时间(毫秒) */ private int sessionTimeout = 5000; /** * 连接超时时间(毫秒) */ private int connectionTimeout = 5000; /** * 重试基础睡眠时间(毫秒) */ private int baseSleepTime = 1000; /** * 最大重试次数 */ private int maxRetries = 3; /** * 最大睡眠时间(毫秒) */ private int maxSleepTime = 5000; /** * 命名空间 */ private String namespace = "springboot"; }2.3.3 Zookeeper配置类
package com.example.zookeeper.config; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Configuration public class ZookeeperConfig { @Autowired private ZookeeperProperties zookeeperProperties; private CuratorFramework client; @Bean public CuratorFramework curatorFramework() { // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry( zookeeperProperties.getBaseSleepTime(), zookeeperProperties.getMaxRetries(), zookeeperProperties.getMaxSleepTime() ); // 创建CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getConnectString()) .sessionTimeoutMs(zookeeperProperties.getSessionTimeout()) .connectionTimeoutMs(zookeeperProperties.getConnectionTimeout()) .retryPolicy(retryPolicy) .namespace(zookeeperProperties.getNamespace()) .build(); return client; } @PostConstruct public void init() { // 启动客户端 client.start(); System.out.println("Zookeeper客户端已启动,连接地址: " + zookeeperProperties.getConnectString()); } @PreDestroy public void destroy() { if (client != null) { client.close(); System.out.println("Zookeeper客户端已关闭"); } } }2.4 实现分布式锁
2.4.1 分布式锁工具类
package com.example.zookeeper.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class DistributedLock { @Autowired private CuratorFramework curatorFramework; /** * 获取分布式锁 * @param lockPath 锁路径 * @param waitTime 等待时间 * @param timeUnit 时间单位 * @return 锁对象 */ public InterProcessMutex acquireLock(String lockPath, long waitTime, TimeUnit timeUnit) { InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); try { if (lock.acquire(waitTime, timeUnit)) { return lock; } } catch (Exception e) { throw new RuntimeException("获取分布式锁失败", e); } return null; } /** * 释放分布式锁 * @param lock 锁对象 */ public void releaseLock(InterProcessMutex lock) { if (lock != null && lock.isAcquiredInThisProcess()) { try { lock.release(); } catch (Exception e) { throw new RuntimeException("释放分布式锁失败", e); } } } }2.4.2 分布式锁注解
package com.example.zookeeper.lock; import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ZookeeperLock { /** * 锁的路径 */ String lockPath(); /** * 等待时间,默认5秒 */ long waitTime() default 5; /** * 时间单位,默认秒 */ TimeUnit timeUnit() default TimeUnit.SECONDS; }2.4.3 分布式锁切面
package com.example.zookeeper.lock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Aspect @Component public class ZookeeperLockAspect { @Autowired private DistributedLock distributedLock; @Pointcut("@annotation(com.example.zookeeper.lock.ZookeeperLock)") public void lockPointcut() {} @Around("lockPointcut()") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); ZookeeperLock lockAnnotation = signature.getMethod().getAnnotation(ZookeeperLock.class); String lockPath = lockAnnotation.lockPath(); long waitTime = lockAnnotation.waitTime(); java.util.concurrent.TimeUnit timeUnit = lockAnnotation.timeUnit(); InterProcessMutex lock = null; try { // 获取锁 lock = distributedLock.acquireLock(lockPath, waitTime, timeUnit); if (lock == null) { throw new RuntimeException("获取分布式锁超时"); } // 执行原方法 return joinPoint.proceed(); } finally { // 释放锁 if (lock != null) { distributedLock.releaseLock(lock); } } } }2.5 实现配置中心
2.5.1 配置监听器
package com.example.zookeeper.configcenter; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class ConfigWatcher { @Autowired private CuratorFramework curatorFramework; private static final String CONFIG_PATH = "/config"; @PostConstruct public void init() throws Exception { // 确保配置节点存在 if (curatorFramework.checkExists().forPath(CONFIG_PATH) == null) { curatorFramework.create().creatingParentsIfNeeded().forPath(CONFIG_PATH, "default config".getBytes()); } // 创建节点缓存 NodeCache nodeCache = new NodeCache(curatorFramework, CONFIG_PATH); nodeCache.start(true); // 添加监听器 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { byte[] data = nodeCache.getCurrentData().getData(); String config = new String(data); System.out.println("配置已更新: " + config); // 这里可以触发配置刷新逻辑 } }); } /** * 获取配置 */ public String getConfig() throws Exception { byte[] data = curatorFramework.getData().forPath(CONFIG_PATH); return new String(data); } /** * 更新配置 */ public void updateConfig(String config) throws Exception { curatorFramework.setData().forPath(CONFIG_PATH, config.getBytes()); } }2.6 实现服务注册与发现
2.6.1 服务注册
package com.example.zookeeper.registry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.UriSpec; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.UUID; @Component public class ServiceRegistry { @Autowired private CuratorFramework curatorFramework; @Value("${server.port}") private int port; @Value("${spring.application.name}") private String serviceName; private ServiceDiscovery<ServiceInfo> serviceDiscovery; private ServiceInstance<ServiceInfo> serviceInstance; @PostConstruct public void init() throws Exception { // 创建服务实例 serviceInstance = ServiceInstance.<ServiceInfo>builder() .name(serviceName) .port(port) .address(getLocalIp()) .payload(new ServiceInfo(serviceName, "1.0.0")) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .id(UUID.randomUUID().toString()) .build(); // 创建服务发现 serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class) .client(curatorFramework) .basePath("/services") .serializer(new JsonInstanceSerializer<>(ServiceInfo.class)) .thisInstance(serviceInstance) .build(); // 注册服务 serviceDiscovery.start(); System.out.println("服务注册成功: " + serviceInstance); } @PreDestroy public void destroy() throws Exception { if (serviceDiscovery != null) { serviceDiscovery.close(); System.out.println("服务注销成功"); } } private String getLocalIp() { try { return java.net.InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { return "127.0.0.1"; } } // 服务信息类 public static class ServiceInfo { private String serviceName; private String version; public ServiceInfo() {} public ServiceInfo(String serviceName, String version) { this.serviceName = serviceName; this.version = version; } // getters and setters public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } } }2.6.2 服务发现
package com.example.zookeeper.registry; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @Component public class ServiceDiscoveryClient { @Autowired private CuratorFramework curatorFramework; private ServiceDiscovery<ServiceRegistry.ServiceInfo> serviceDiscovery; @PostConstruct public void init() throws Exception { serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceRegistry.ServiceInfo.class) .client(curatorFramework) .basePath("/services") .serializer(new JsonInstanceSerializer<>(ServiceRegistry.ServiceInfo.class)) .build(); serviceDiscovery.start(); } /** * 获取所有服务实例 */ public Collection<ServiceInstance<ServiceRegistry.ServiceInfo>> getAllInstances(String serviceName) throws Exception { return serviceDiscovery.queryForInstances(serviceName); } /** * 获取服务实例地址列表 */ public List<String> getServiceAddresses(String serviceName) throws Exception { return getAllInstances(serviceName).stream() .map(instance -> instance.getAddress() + ":" + instance.getPort()) .collect(Collectors.toList()); } /** * 获取一个可用的服务实例 */ public ServiceInstance<ServiceRegistry.ServiceInfo> getOneInstance(String serviceName) throws Exception { Collection<ServiceInstance<ServiceRegistry.ServiceInfo>> instances = getAllInstances(serviceName); if (instances.isEmpty()) { throw new RuntimeException("未找到可用的服务实例: " + serviceName); } // 简单的负载均衡:随机选择一个 return instances.iterator().next(); } }2.7 控制器示例
package com.example.zookeeper.controller; import com.example.zookeeper.lock.ZookeeperLock; import com.example.zookeeper.registry.ServiceDiscoveryClient; import org.apache.curator.x.discovery.ServiceInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.HashMap; import java.util.Map; @RestController @RequestMapping("/api") public class DemoController { @Autowired private ServiceDiscoveryClient serviceDiscoveryClient; @GetMapping("/lock-demo") @ZookeeperLock(lockPath = "/lock/order", waitTime = 3) public Map<String, Object> lockDemo() { Map<String, Object> result = new HashMap<>(); result.put("code", 200); result.put("message", "分布式锁测试成功"); result.put("timestamp", System.currentTimeMillis()); return result; } @GetMapping("/services") public Map<String, Object> getServices(@RequestParam String serviceName) throws Exception { Map<String, Object> result = new HashMap<>(); ServiceInstance<com.example.zookeeper.registry.ServiceRegistry.ServiceInfo> instance = serviceDiscoveryClient.getOneInstance(serviceName); result.put("code", 200); result.put("service", instance); result.put("address", instance.getAddress() + ":" + instance.getPort()); return result; } @GetMapping("/health") public Map<String, Object> health() { Map<String, Object> result = new HashMap<>(); result.put("status", "UP"); result.put("timestamp", System.currentTimeMillis()); return result; } }2.8 主启动类
package com.example.zookeeper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ZookeeperApplication { public static void main(String[] args) { SpringApplication.run(ZookeeperApplication.class, args); System.out.println("SpringBoot整合Zookeeper示例已启动"); } }三、集群部署配置
3.1 Zookeeper集群配置
3.1.1 zoo.cfg配置
# zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 # 集群配置 server.1=192.168.1.101:2888:3888 server.2=192.168.1.102:2888:3888 server.3=192.168.1.103:2888:38883.1.2 创建myid文件
# 在每台服务器的dataDir目录下创建myid文件 echo 1 > /data/zookeeper/myid # 第一台服务器 echo 2 > /data/zookeeper/myid # 第二台服务器 echo 3 > /data/zookeeper/myid # 第三台服务器3.2 SpringBoot应用配置
3.2.1 多环境配置
# application-cluster.yml zookeeper: connect-string: 192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181 session-timeout: 5000 connection-timeout: 5000 base-sleep-time: 1000 max-retries: 3 max-sleep-time: 5000 namespace: springboot-cluster3.2.2 启动脚本
#!/bin/bash # start-cluster.sh # 启动第一个实例 nohup java -jar springboot-zookeeper-demo.jar \ --spring.profiles.active=cluster \ --server.port=8080 & # 启动第二个实例 nohup java -jar springboot-zookeeper-demo.jar \ --spring.profiles.active=cluster \ --server.port=8081 & # 启动第三个实例 nohup java -jar springboot-zookeeper-demo.jar \ --spring.profiles.active=cluster \ --server.port=8082 & echo "集群启动完成"四、测试验证
4.1 创建测试类
package com.example.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class ZookeeperApplicationTests { @Autowired private CuratorFramework curatorFramework; @Test void testConnection() throws Exception { // 测试连接 String path = "/test"; String data = "Hello Zookeeper"; // 创建节点 if (curatorFramework.checkExists().forPath(path) == null) { curatorFramework.create().creatingParentsIfNeeded().forPath(path, data.getBytes()); } // 读取数据 byte[] bytes = curatorFramework.getData().forPath(path); System.out.println("节点数据: " + new String(bytes)); // 更新数据 curatorFramework.setData().forPath(path, "Updated Data".getBytes()); // 删除节点 curatorFramework.delete().forPath(path); System.out.println("Zookeeper连接测试通过"); } }4.2 分布式锁测试
package com.example.zookeeper; import com.example.zookeeper.lock.DistributedLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @SpringBootTest class DistributedLockTest { @Autowired private DistributedLock distributedLock; private int counter = 0; @Test void testDistributedLock() throws InterruptedException { int threadCount = 10; ExecutorService executorService = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { executorService.submit(() -> { InterProcessMutex lock = distributedLock.acquireLock("/test/lock", 5, TimeUnit.SECONDS); try { if (lock != null) { // 模拟业务操作 System.out.println(Thread.currentThread().getName() + " 获取锁成功"); Thread.sleep(100); counter++; } else { System.out.println(Thread.currentThread().getName() + " 获取锁失败"); } } catch (Exception e) { e.printStackTrace(); } finally { distributedLock.releaseLock(lock); latch.countDown(); } }); } latch.await(); executorService.shutdown(); System.out.println("最终计数器值: " + counter); assert counter == threadCount; } }五、总结
5.1 技术要点总结
- Zookeeper核心概念:
- Zookeeper提供分布式协调服务,基于ZAB协议保证数据一致性
- 采用树形节点结构,支持临时节点、顺序节点等特性
- 提供Watch机制实现事件通知
- Curator框架优势:
- 简化了Zookeeper客户端的操作
- 提供了丰富的分布式锁、选举、缓存等高级功能
- 内置了重试机制和连接管理
- 分布式锁实现:
- 使用InterProcessMutex实现可重入分布式锁
- 通过AOP实现注解式锁管理
- 支持锁超时和自动释放
- 服务注册与发现:
- 基于ServiceDiscovery实现服务自动注册
- 支持服务健康检查和负载均衡
- 实现配置的动态监听和更新
5.2 最佳实践建议
- 连接管理:
- 使用连接池管理Zookeeper连接
- 合理设置超时时间和重试策略
- 监控连接状态,实现自动重连
- 节点设计:
- 合理规划节点路径,避免节点过多
- 及时清理临时节点和不再使用的数据
- 使用命名空间隔离不同环境
- 集群部署:
- 建议使用奇数个节点(3或5)组成集群
- 确保节点间网络通畅
- 定期备份数据和快照
- 性能优化:
- 避免频繁的节点创建和删除
- 合理使用Watch机制,避免过多监听
- 使用异步接口处理耗时操作
5.3 适用场景
- 分布式锁:秒杀系统、订单处理等需要互斥访问的场景
- 配置中心:动态配置管理,配置热更新
- 服务注册发现:微服务架构中的服务治理
- 分布式协调:分布式任务调度、主从选举
- 命名服务:分布式系统中的服务寻址
5.4 注意事项
- 数据一致性:Zookeeper保证最终一致性,但不是强一致性
- 性能限制:Zookeeper不适合存储大量数据,主要用于协调信息
- 脑裂问题:网络分区时可能出现脑裂,需要合理配置超时时间
- 版本兼容:注意客户端和服务端版本兼容性
通过以上实现,我们成功将SpringBoot与Zookeeper集成,实现了分布式锁、配置中心、服务注册发现等核心功能,为构建分布式系统提供了可靠的基础设施支持。在实际应用中,可以根据具体需求选择合适的功能模块,并进行适当的优化和扩展。
谢谢你看我的文章,既然看到这里了,如果觉得不错,随手点个赞、转发、在看三连吧,感谢感谢。那我们,下次再见。
您的一键三连,是我更新的最大动力,谢谢
山水有相逢,来日皆可期,谢谢阅读,我们再会
我手中的金箍棒,上能通天,下能探海