文章目录
- 1 CountDownLatch:倒计时门闩,等待多线程完成的利器
- 1.1 核心概念与API
- 1.2 实战案例:多数据源加载
- 1.3 原理解析
- 1.4 注意事项
- 2 CyclicBarrier:循环屏障,多阶段任务协调的得力助手
- 2.1 核心概念与API
- 2.2 实战案例:多阶段计算任务
- 2.3 原理解析
- 2.4 注意事项
- 3 Semaphore:信号量,控制并发访问的流量警察
- 3.1 核心概念与API
- 3.2 实战案例:数据库连接池
- 3.3 其他方法
- 3.4 公平性与性能优化
- 4 三大同步工具对比与选型指南
- 4.1 核心区别对比
- 4.2 选型指南
- 5 总结
- 参考资料
大家好,我是你们的技术老友科威舟,今天给大家分享一下Java并发编程三大同步工具:用CountDownLatch、CyclicBarrier、Semaphore。
多个线程之间的协作,如同团队中的配合,需要一套清晰的规则和信号机制。
在日常开发中,我们经常会遇到需要多个线程协同工作的场景。比如,主线程需要等待几个工作线程都完成初始化后才能继续执行,或者需要控制同时访问某个资源的线程数量。
Java并发包中提供了三个强大的同步工具类:CountDownLatch、CyclicBarrier和Semaphore,它们就像是线程世界的交通信号灯,让混乱的并发执行变得有序可控。今天,我们就来深入剖析这三个工具的使用方法和实战场景。
1 CountDownLatch:倒计时门闩,等待多线程完成的利器
CountDownLatch可以理解为一个倒计时器,它允许一个或多个线程等待其他线程完成操作后再继续执行。这就像团队组织旅游,导游要等到所有游客都集合完毕后才能出发前往下一个景点。
1.1 核心概念与API
CountDownLatch的工作原理基于一个计数器:创建CountDownLatch时指定一个正整数作为初始计数值,每当有线程完成自己的任务时,计数器减1,当计数器值为0时,表示所有线程已完成任务,此时等待的线程可以被唤醒继续执行。
主要API包括:
CountDownLatch(int count):构造方法,初始化计数器值await():等待计数器归零,会阻塞当前线程await(long timeout, TimeUnit unit):带超时的等待countDown():计数器减1getCount():获取当前计数器的值
1.2 实战案例:多数据源加载
假设我们需要从多个数据源加载数据,只有所有数据都加载完成后,才能进行数据整合和展示:
importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;publicclassDataLoadingExample{publicstaticvoidmain(String[]args)throwsInterruptedException{// 创建CountDownLatch,计数器设置为3(对应三个数据源)CountDownLatchlatch=newCountDownLatch(3);ExecutorServiceexecutor=Executors.newFixedThreadPool(3);// 记录开始时间longstart=System.currentTimeMillis();// 模拟从三个不同数据源加载数据executor.submit(()->loadDataFromSource("数据库",3000,latch));executor.submit(()->loadDataFromSource("Redis",2000,latch));executor.submit(()->loadDataFromSource("API",1500,latch));System.out.println("等待所有数据源加载完成...");// 等待所有数据源加载完成latch.await();// 记录完成时间longend=System.currentTimeMillis();System.out.println("所有数据源加载完成,总耗时: "+(end-start)+"ms");System.out.println("开始处理整合的数据...");executor.shutdown();}privatestaticvoidloadDataFromSource(Stringsource,intsleepTime,CountDownLatchlatch){try{System.out.println("开始从"+source+"加载数据...");// 模拟耗时操作Thread.sleep(sleepTime);System.out.println(source+"数据加载完成");}catch(InterruptedExceptione){e.printStackTrace();}finally{// 完成后计数器减1latch.countDown();System.out.println(source+"数据源加载完成,通知CountDownLatch计数器减1,当前计数:"+latch.getCount());}}}执行结果可以看到,尽管三个数据源的加载时间不同,但主线程会等待所有数据源都加载完成后才继续执行。
1.3 原理解析
CountDownLatch内部基于AQS(AbstractQueuedSynchronizer)实现,使用AQS的共享模式。当创建一个CountDownLatch实例时,传入的初始计数值会被保存在AQS的state变量中:
await()方法会先检查state是否为0,如果是则继续执行,否则将当前线程加入等待队列countDown()方法会将state值减1,当减到0时,会唤醒所有在await()上等待的线程
1.4 注意事项
- 计数器不能重置:CountDownLatch的计数器无法重置,一旦计数到0,就不能再用了
- 只能等待一次性事件:适合等待一次性事件,不适合周期性重复的场景
- 注意处理中断异常:
await()方法会抛出InterruptedException - 可能导致永久等待:如果某个任务没有正确调用
countDown(),可能导致等待线程永远阻塞
2 CyclicBarrier:循环屏障,多阶段任务协调的得力助手
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。
这就像几个朋友约定一起吃饭,必须所有人都到齐后才会开始点餐,只要有人没到,先到的人就得耐心等待。
2.1 核心概念与API
与CountDownLatch不同,CyclicBarrier可以重复使用,特别适合分阶段任务的场景。
主要API包括:
CyclicBarrier(int parties):创建一个屏障,等待指定数量的线程CyclicBarrier(int parties, Runnable barrierAction):创建一个屏障,并在所有线程到达时执行barrierActionawait():等待所有线程到达屏障点await(long timeout, TimeUnit unit):带超时的等待reset():重置屏障getNumberWaiting():获取当前在屏障处等待的线程数isBroken():查询屏障是否被破坏
2.2 实战案例:多阶段计算任务
假设我们有一个复杂的计算任务,可以分成三个阶段,每个阶段都需要多个线程协同计算,只有当所有线程完成当前阶段后,才能一起进入下一阶段:
importjava.util.concurrent.BrokenBarrierException;importjava.util.concurrent.CyclicBarrier;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;publicclassMultiStageCalculation{publicstaticvoidmain(String[]args){intworkerCount=3;// 创建CyclicBarrier,所有线程到达屏障时会执行指定的操作CyclicBarrierbarrier=newCyclicBarrier(workerCount,()->{System.out.println("======= 所有线程完成当前阶段,准备进入下一阶段 =======");});ExecutorServiceexecutor=Executors.newFixedThreadPool(workerCount);for(inti=0;i<workerCount;i++){finalintworkerId=i;executor.submit(()->{try{// 第一阶段:数据准备System.out.println("工作线程"+workerId+"开始准备数据...");Thread.sleep(1000+(int)(Math.random()*1000));System.out.println("工作线程"+workerId+"完成数据准备");barrier.await();// 等待所有线程完成数据准备// 第二阶段:数据计算System.out.println("工作线程"+workerId+"开始数据计算...");Thread.sleep(2000+(int)(Math.random()*1000));System.out.println("工作线程"+workerId+"完成数据计算");barrier.await();// 等待所有线程完成数据计算// 第三阶段:结果汇总System.out.println("工作线程"+workerId+"开始汇总结果...");Thread.sleep(1000+(int)(Math.random()*1000));System.out.println("工作线程"+workerId+"完成结果汇总");barrier.await();// 等待所有线程完成结果汇总System.out.println("工作线程"+workerId+"所有任务完成!");}catch(InterruptedException|BrokenBarrierExceptione){e.printStackTrace();}});}executor.shutdown();}}在这个例子中,三个工作线程需要协同完成三个阶段的任务。每个阶段完成后,线程都会在屏障处等待其他线程,只有当所有线程都到达屏障点后,才会一起进入下一阶段。
2.3 原理解析
CyclicBarrier基于ReentrantLock和Condition实现,与CountDownLatch直接使用AQS不同:
- 内部使用ReentrantLock和Condition实现线程同步
- 维护计数器,记录还未到达屏障的线程数
- 当线程调用
await()时,计数器减1 - 如果计数器不为0,当前线程进入等待状态
- 当最后一个线程到达屏障点,计数器变为0
- 执行屏障动作(如果有)
- 重置计数器为初始值
- 唤醒所有等待的线程
2.4 注意事项
- 可以重复使用:与CountDownLatch不同,CyclicBarrier可以通过自动重置或手动调用
reset()方法重置 - 必须所有线程都调用await():如果有线程没有调用
await(),可能导致其他线程永久等待 - 处理中断和超时情况:
await()方法会抛出InterruptedException和BrokenBarrierException - 注意屏障破坏:当有线程中断或超时,屏障会被破坏(broken),需要调用
reset()重置
3 Semaphore:信号量,控制并发访问的流量警察
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程以保证合理的使用公共资源。
可以把Semaphore想象成一个停车场的入口系统:停车场只有有限的车位(许可),当车位已满时,新来的车辆需要等待,直到有车辆离开空出车位。
3.1 核心概念与API
Semaphore维护了一个许可证集合,线程在访问资源前必须获取许可,用完后释放。
主要API包括:
Semaphore(int permits):创建指定许可数的信号量,默认非公平模式Semaphore(int permits, boolean fair):创建信号量,可指定是否公平acquire():获取一个许可,如果没有可用的许可则阻塞acquire(int permits):获取指定数量的许可tryAcquire():尝试获取许可,立即返回成功或失败tryAcquire(long timeout, TimeUnit unit):带超时的尝试获取release():释放一个许可release(int permits):释放指定数量的许可availablePermits():返回当前可用的许可数
3.2 实战案例:数据库连接池
假设我们要设计一个简单的数据库连接池,限制同时活动的连接数,防止连接数过多导致数据库压力过大:
importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.Semaphore;importjava.util.concurrent.atomic.AtomicInteger;publicclassDatabaseConnectionPool{// 连接池大小privatefinalintMAX_CONNECTIONS=10;// 信号量,控制允许的并发连接数privatefinalSemaphoresemaphore;// 连接列表privatefinalList<JdbcConnection>connectionList=newArrayList<>();// 当前已创建的连接数privatefinalAtomicIntegercreatedConnections=newAtomicInteger(0);publicDatabaseConnectionPool(){// 创建信号量,设置最大许可数为连接池大小this.semaphore=newSemaphore(MAX_CONNECTIONS);}// 获取数据库连接publicJdbcConnectiongetConnection()throwsInterruptedException{// 获取许可semaphore.acquire();// 获取或创建连接returngetOrCreateConnection();}// 释放连接publicvoidreleaseConnection(JdbcConnectionconnection){if(connection!=null){// 将连接放回池中returnConnectionToPool(connection);// 释放许可semaphore.release();System.out.println("释放连接: "+connection.getId()+", 释放许可后当前可用许可: "+semaphore.availablePermits());}}// 获取或创建连接(简化示例,实际需要同步控制)privatesynchronizedJdbcConnectiongetOrCreateConnection(){// 首先尝试从池中获取可用连接for(JdbcConnectionconn:connectionList){if(!conn.isInUse()){conn.setInUse(true);System.out.println("复用连接: "+conn.getId()+", 当前可用许可: "+semaphore.availablePermits());returnconn;}}// 如果没有可用连接且未达到最大连接数,则创建新连接if(createdConnections.get()<MAX_CONNECTIONS){JdbcConnectionnewConn=createNewConnection();connectionList.add(newConn);System.out.println("创建新连接: "+newConn.getId()+", 当前可用许可: "+semaphore.availablePermits());returnnewConn;}// 这种情况理论上不会发生,因为Semaphore控制了并发量thrownewIllegalStateException("无法获取连接");}// 创建新连接privateJdbcConnectioncreateNewConnection(){intid=createdConnections.incrementAndGet();// 模拟创建JDBC连接returnnewJdbcConnection(id);}// 将连接放回池中privatevoidreturnConnectionToPool(JdbcConnectionconnection){connection.setInUse(false);}// 模拟数据库连接类publicstaticclassJdbcConnection{privatefinalintid;privatebooleaninUse;publicJdbcConnection(intid){this.id=id;this.inUse=true;}publicintgetId(){returnid;}publicbooleanisInUse(){returninUse;}publicvoidsetInUse(booleaninUse){this.inUse=inUse;}// 模拟执行SQLpublicvoidexecuteQuery(Stringsql){System.out.println("连接"+id+"执行SQL: "+sql);}}}在这个连接池实现中,Semaphore用于控制同时获取数据库连接的线程数量,防止过多的连接导致数据库压力过大。
3.3 其他方法
Semaphore还提供了一些有用的方法:
int availablePermits():返回此信号量中当前可用的许可证数int getQueueLength():返回正在等待获取许可证的线程数boolean hasQueuedThreads():是否有线程正在等待获取许可证void reducePermits(int reduction):减少reduction个许可证(protected方法)Collection getQueuedThreads():返回所有等待获取许可证的线程集合(protected方法)
3.4 公平性与性能优化
Semaphore有公平和非公平两种模式。公平模式下,线程按照请求的顺序获取许可;非公平模式下,线程可能随机获取许可。
- 非公平模式:通常具有更高的吞吐量,因为它允许更多的线程尝试获取许可
- 公平模式:保证先请求的线程先获得许可,避免线程饥饿
在实际应用中,需要根据具体的场景和需求来选择公平或非公平模式。如果对执行顺序有严格要求,应选择公平模式;如果追求高吞吐量,非公平模式可能是更好的选择。
4 三大同步工具对比与选型指南
了解了这三个工具后,我们来总结一下它们的特点和适用场景:
4.1 核心区别对比
| 特性 | CountDownLatch | CyclicBarrier | Semaphore |
|---|---|---|---|
| 是否可重用 | 不可重用,一次性 | 可重用,自动/手动重置 | 可重用,许可可重复获取释放 |
| 主要用途 | 等待一个或多个事件完成 | 多个线程相互等待到屏障点 | 控制同时访问资源的线程数 |
| 计数器变化 | 递减,到0释放等待线程 | 递增到设定值释放所有线程 | 许可数获取时减少,释放时增加 |
| 底层实现 | 基于AQS共享模式 | 基于ReentrantLock和Condition | 基于AQS共享模式 |
4.2 选型指南
- 选择CountDownLatch当:需要等待一个或多个事件完成后再继续执行,且事件是一次性的
- 选择CyclicBarrier当:多个线程需要相互等待,到达一个公共屏障点后才能继续执行,特别是多阶段任务
- 选择Semaphore当:需要控制同时访问特定资源的线程数量,进行流量控制
5 总结
CountDownLatch、CyclicBarrier和Semaphore是Java并发编程中不可或缺的三个同步工具类,它们分别解决了不同的线程协调问题。
- CountDownLatch像是旅行团的导游,要等到所有游客(线程)都完成某个动作(如集合)后,才能继续下一步行程
- CyclicBarrier像是一群朋友约定吃饭,必须所有人都到齐后才会开始点餐,而且这种约定可以多次进行(多阶段任务)
- Semaphore像是停车场的入口系统,只有有空车位(许可)时才会允许车辆进入,车位满时需要等待
在实际开发中,根据具体场景选择合适的同步工具,可以大大简化并发编程的复杂度,提高程序的可靠性和性能。希望本文能帮助读者更好地理解和运用这些强大的并发工具类。
参考资料
- https://blog.51cto.com/u_17372029/13862875
- http://bbs.huaweicloud.com/blogs/453005
- https://blog.csdn.net/m0_57781768/article/details/133394638
- https://blog.51cto.com/u_16213699/13141222
- https://blog.csdn.net/weixin_34062329/article/details/94097526
- https://developer.aliyun.com/article/1502102
- https://developer.aliyun.com/article/1326526
- https://developer.aliyun.com/article/1593807
- https://www.cnblogs.com/nuccch/p/17558832.html
希望这篇文章能帮助你更好地理解和应用Java并发编程中的三大同步工具类!如果有任何问题或建议,欢迎在评论区留言讨论。
更多技术干货欢迎关注微信公众号科威舟的AI笔记~
【转载须知】:转载请注明原文出处及作者信息