StructBERT部署教程:用户评论情感分析系统
2026/1/11 16:38:06
AQS(AbstractQueuedSynchronizer)是 Java 并发包的核心组件,可以把它想象成一个万能排队管理器。它管理着一个虚拟的排队队列,让线程能够有序地获取和释放资源。
// 简化版 AQS 内部结构publicabstractclassAbstractQueuedSynchronizer{// 1. 状态变量 - 核心!privatevolatileintstate;// 2. 等待队列的节点staticfinalclassNode{Threadthread;// 等待的线程Nodeprev;// 前驱节点Nodenext;// 后继节点intwaitStatus;// 等待状态}// 3. 队列头尾privatetransientvolatileNodehead;privatetransientvolatileNodetail;}想象一个公共厕所:
state = 1:有一个坑位可用importjava.util.concurrent.locks.AbstractQueuedSynchronizer;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;/** * 自己实现的简易锁 - 基于 AQS */publicclassMySimpleLock{// 内部同步器privatestaticclassSyncextendsAbstractQueuedSynchronizer{// 尝试获取锁(CAS 设置 state 从 0 到 1)@OverrideprotectedbooleantryAcquire(intarg){// 使用 CAS 原子操作尝试获取锁if(compareAndSetState(0,1)){// 设置当前线程为独占所有者setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}// 尝试释放锁@OverrideprotectedbooleantryRelease(intarg){if(getState()==0){thrownewIllegalMonitorStateException();}// 清除独占所有者setExclusiveOwnerThread(null);// 注意:state 设置要在最后,保证可见性setState(0);returntrue;}// 是否被当前线程独占@OverrideprotectedbooleanisHeldExclusively(){returngetExclusiveOwnerThread()==Thread.currentThread();}}privatefinalSyncsync=newSync();publicvoidlock(){sync.acquire(1);// AQS 的模板方法}publicvoidunlock(){sync.release(1);// AQS 的模板方法}}publicclassAQSDemo{privatestaticintcount=0;privatestaticfinalMySimpleLocklock=newMySimpleLock();publicstaticvoidmain(String[]args)throwsInterruptedException{// 创建 10 个线程并发累加Thread[]threads=newThread[10];for(inti=0;i<10;i++){threads[i]=newThread(()->{for(intj=0;j<1000;j++){lock.lock();// 加锁try{count++;// 临界区操作}finally{lock.unlock();// 解锁}}});}// 启动所有线程for(Threadt:threads){t.start();}// 等待所有线程完成for(Threadt:threads){t.join();}System.out.println("最终结果: "+count);// 应该是 10000}}/** * 可重入锁实现 - 支持同一个线程重复加锁 */publicclassMyReentrantLock{privatestaticclassSyncextendsAbstractQueuedSynchronizer{// 获取锁@OverrideprotectedbooleantryAcquire(intacquires){Threadcurrent=Thread.currentThread();intc=getState();// 状态为 0,表示锁未被占用if(c==0){if(compareAndSetState(0,acquires)){setExclusiveOwnerThread(current);returntrue;}}// 锁已被占用,检查是否是当前线程(重入)elseif(current==getExclusiveOwnerThread()){intnextc=c+acquires;// 增加重入次数if(nextc<0){thrownewError("Maximum lock count exceeded");}setState(nextc);// 更新状态returntrue;}returnfalse;}// 释放锁@OverrideprotectedbooleantryRelease(intreleases){intc=getState()-releases;if(Thread.currentThread()!=getExclusiveOwnerThread()){thrownewIllegalMonitorStateException();}booleanfree=false;if(c==0){free=true;setExclusiveOwnerThread(null);}setState(c);returnfree;}// 创建 Condition 对象publicjava.util.concurrent.locks.ConditionnewCondition(){returnnewConditionObject();}}privatefinalSyncsync=newSync();publicvoidlock(){sync.acquire(1);}publicvoidunlock(){sync.release(1);}publicbooleantryLock(){returnsync.tryAcquire(1);}}/** * 信号量实现 - 控制同时访问的线程数 */publicclassMySemaphore{privatestaticclassSyncextendsAbstractQueuedSynchronizer{Sync(intpermits){setState(permits);// 初始化许可数量}// 获取许可@OverrideprotectedinttryAcquireShared(intacquires){for(;;){// 自旋intavailable=getState();intremaining=available-acquires;// 如果剩余许可不足,或者 CAS 成功if(remaining<0||compareAndSetState(available,remaining)){returnremaining;}}}// 释放许可@OverrideprotectedbooleantryReleaseShared(intreleases){for(;;){// 自旋intcurrent=getState();intnext=current+releases;if(next<current){thrownewError("Maximum permit count exceeded");}if(compareAndSetState(current,next)){returntrue;}}}}privatefinalSyncsync;publicMySemaphore(intpermits){sync=newSync(permits);}publicvoidacquire()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}publicvoidrelease(){sync.releaseShared(1);}}publicclassAQSApplications{publicstaticvoidmain(String[]args){// 1. ReentrantLock - 可重入锁ReentrantLocklock=newReentrantLock();// 2. Semaphore - 信号量Semaphoresemaphore=newSemaphore(5);// 5个许可// 3. CountDownLatch - 倒计时门闩CountDownLatchlatch=newCountDownLatch(3);// 4. ReentrantReadWriteLock - 读写锁ReentrantReadWriteLockrwLock=newReentrantReadWriteLock();// 5. CyclicBarrier - 循环屏障CyclicBarrierbarrier=newCyclicBarrier(3);}}acquire(int arg)- 独占式获取,忽略中断acquireInterruptibly(int arg)- 独占式获取,响应中断tryAcquireNanos(int arg, long nanos)- 带超时的获取release(int arg)- 独占式释放acquireShared(int arg)- 共享式获取releaseShared(int arg)- 共享式释放// 自定义同步器的通用模式classCustomSyncextendsAbstractQueuedSynchronizer{// 通常需要实现的方法:// 1. tryAcquire/tryRelease - 独占模式// 2. tryAcquireShared/tryReleaseShared - 共享模式// 3. isHeldExclusively - 是否独占// 然后暴露给外部使用:publicvoidcustomLock(){acquire(1);}publicvoidcustomUnlock(){release(1);}}AQS 是 Java 并发编程的基石,理解它对于掌握 Java 高并发编程至关重要。虽然日常开发中不常直接使用,但了解其原理能让你更好地使用并发工具,并在面试中脱颖而出!