手写阻塞队列的线程安全设计:这段MyBlockingQueue到底“安全”在哪?
这份MyBlockingQueue是典型的循环数组 + 阻塞语义实现:data[]存元素,head/tail控制出队/入队位置,size记录当前元素个数。并发场景里,“线程安全”要同时满足两件事:
- 数据结构不被写坏:任何时刻都要维持队列不变量:
0 <= size <= capacity,head/tail始终落在合法范围内,入队不会覆盖未消费元素,出队不会重复消费同一元素。 - 阻塞语义正确:满了
put必须等,空了take必须等;并且等待要能被可靠唤醒,唤醒后行为仍然正确。
下面按代码里的关键点逐个拆开讲:哪些地方做了线程安全处理、为什么必须这么做。
源代码:
classMyBlockingQueue{privateString[]data=null;privateinthead=0;privateinttail=0;privateintsize=0;publicMyBlockingQueue(intcapacity){data=newString[capacity];}publicvoidput(Stringelem)throwsInterruptedException{synchronized(this){while(size>=data.length){//这里用while不用if的原因跟下面一样//队列满了,需要阻塞//return;this.wait();}data[tail]=elem;tail++;if(tail>=data.length){tail=0;}size++;this.notify();}}publicStringtake()throwsInterruptedException{synchronized(this){while(size==0){//return null;this.wait();//用while不用if是为了多次验证当前这里的条件是否成立//wait唤醒之前跟之后都判定一次,主要目的在之后这一次}Stringret=data[head];head++;if(head>=data.length){head=0;}size--;this.notify();returnret;}}}1)synchronized (this):把共享状态的读写变成互斥的“原子区间”
put()和take()的核心逻辑都包在:
synchronized(this){...}这一步是线程安全的“地基”。原因在于:入队/出队都不是单条指令,而是一串复合操作:
- 检查条件(满/空)
- 写/读数组槽位
- 移动
head/tail(并处理回环) - 修改
size - 唤醒等待线程
如果没有互斥,两个线程交错执行会出现非常具体的灾难:
- 丢失更新:两个线程同时
size++或size--,最终只生效一次 - 覆盖数据:两个生产者同时写入同一个
tail位置,覆盖彼此元素 - 重复消费:两个消费者同时从同一个
head位置取值 - 指针错乱:
head/tail推进时被打断,导致结构性破坏(队列不变量失真)
synchronized (this)的意义就是:同一时刻只允许一个线程修改data/head/tail/size这组共享状态,把“复合操作”提升为一个对外不可分割的临界区,从而保证原子性与一致性。
2)wait():满/空时阻塞等待,并且关键地——释放锁
put 里的等待(队列满)
while(size>=data.length){this.wait();}take 里的等待(队列空)
while(size==0){this.wait();}这里的线程安全点不只是“阻塞”,而是wait()的组合语义:
- 当前线程进入等待状态
- 释放当前持有的这把锁(this 的监视器锁)
- 之后被唤醒时,会先重新竞争锁,拿到锁后才继续执行
释放锁这件事至关重要:
如果队列满时生产者不释放锁,消费者永远进不来做take(),队列永远满;如果队列空时消费者不释放锁,生产者永远进不来做put(),队列永远空。于是就变成“抱着锁睡觉”,全员卡死。
所以wait()是一种条件同步:不满足条件时把机会让出去,让别的线程进入临界区改变条件。
顺带一提:
wait()必须在持有该对象监视器锁的情况下调用(也就是必须在对应synchronized内),否则会直接抛异常。这里把wait放在同步块内是正确姿势。
3)while不用if:被唤醒 ≠ 条件已经满足
代码里两处都坚持用while:
put:while (size >= data.length) wait()take:while (size == 0) wait()
这不是“风格问题”,而是正确性底线。
原因很现实:线程从wait()醒来后,并不是立刻继续执行,而是要重新抢锁。在重新抢锁、以及重新进入临界区之前,队列状态可能已经被其他线程改过了。于是“醒来时的世界”不一定还是“刚被唤醒时以为的世界”。
再说得更直接一点:
- 可能发生虚假唤醒(spurious wakeup):线程莫名其妙醒了,但条件没变
- 多线程场景下,可能被唤醒的那一刻条件短暂满足,抢到锁时又不满足了
notifyAll场景下会唤醒一批线程,其中只有少数真的能继续,其余必须回去再等
因此正确模板永远是:条件谓词 + while 循环等待。醒来后再检查一次条件,条件不满足就继续wait(),这样才能保证不变量不被破坏。
4)notify():状态改变后“叫醒”对方继续推进
在put()成功入队后:
size++;this.notify();在take()成功出队后:
size--;this.notify();这一块做的是“唤醒机制”:当队列状态发生变化(空→非空 或 满→非满),需要通知在该锁对象上等待的线程,否则可能出现:
- 队列已经有元素了,消费者还一直睡着(死等)
- 队列已经有空位了,生产者还一直睡着(死等)
并且notify()放在同步块内、且在状态修改之后调用,也是关键:
- 必须先把
size/head/tail/data变更落实,唤醒线程抢到锁后才能看到正确状态 - 唤醒并不等于立刻放锁;只有退出
synchronized后才会真正释放锁,让被唤醒线程继续
5)这份实现“安全”到什么程度?一个典型改进点:notifyvsnotifyAll
这份实现靠synchronized + while(wait) + notify已经能保证基本正确性,但在“多生产者 + 多消费者”更通用的场景里,有一个经常被拿来打磨的点:用notifyAll()替代notify()。
为什么notify()可能不够稳
notify()只随机唤醒一个等待线程,而等待线程可能是两类:
- 生产者在等“队列非满”
- 消费者在等“队列非空”
如果现在发生的是“入队一次”(队列从空变为非空),理论上应该唤醒消费者;但notify()有可能恰好唤醒了某个生产者。生产者醒来后会因为while (size >= capacity)条件仍不满足而继续睡回去,这虽然不会破坏正确性(因为用了 while),但会造成更多无效唤醒与竞争,吞吐下降,甚至出现“体感卡顿”。
notifyAll()的价值
notifyAll()会唤醒所有在这把锁上等待的线程,让它们重新竞争锁;最终只有条件满足的线程能通过while检查进入临界区,其余会继续等待。由于 while 做了二次校验,正确性仍然稳。
把两处改成更通用的写法:
// put 成功后this.notifyAll();// take 成功后this.notifyAll();结论:
notifyAll()往往更“稳”,代价是可能唤醒更多线程产生额外上下文切换;notify()更“省”,但在多生产者多消费者混合等待下更依赖运气。
6)额外的小工程点(不影响主线,但值得顺手提)
这些不属于“线程安全核心”,但属于“把实现打磨得更像标准库”的细节:
take()取走元素后可以把槽位置data[head] = null;,避免旧引用滞留(尤其泛型对象时更有意义)- 目前实现固定
String[],工程里更常写成E[]泛型队列 - 只用一把锁(
this)能正确工作,但更复杂的实现会把“非空/非满”拆成不同条件队列(用ReentrantLock + Condition),减少无效唤醒,提高吞吐
总结:
这份阻塞队列之所以能在并发场景下保持正确,关键防线集中在四件事:
- 互斥:
synchronized (this)把共享状态的复合修改变成原子临界区 - 条件同步:队列满/空时用
wait()阻塞,并释放锁让对方改变条件 - 循环校验:用
while重试条件,防止虚假唤醒与竞争下的状态漂移 - 唤醒推进:状态变化后
notify/notifyAll叫醒等待线程继续工作流转