生产消费者模型
概念与作用
- 概念:它通过一个容器(缓冲区)来解决生产者和消费者之间的强耦合问题。
- 解耦:生产者只管生产,消费者只管消费,它们互不认识,只通过缓冲区交互。
- 支持并发:生产者和消费者可以在不同的线程甚至进程中并行工作。
- 忙闲不均: 如果生产者快(例如双11零点下单),缓冲区可以暂时存储请求,让消费者慢慢处理,防止系统崩溃。 如果消费者快,它可以歇一歇,等待数据到来,不浪费 CPU 空转。
"321" 原则
- 1 个中间场所:即 缓冲区 (Buffer) 或 阻塞队列。就像超市的货架,生产者把货放上去,消费者去拿。
- 2 个对象:生产者 :生成数据或任务的线程。消费者 :处理数据或任务的线程。
- 3 种关系:
- 生产者 vs 生产者:互斥关系。两个工厂不能同时往同一个货架的同一个位置放货,会把货架挤坏(数据覆盖 / 错乱)。需要加锁。
- 消费者 vs 消费者:互斥关系。两个顾客不能同时抢同一件商品,会打架(重复消费 数据竞争)。需要加锁。
- 生产者 vs 消费者:同步与互斥关系。互斥:不能一边放一边拿,容易出错。同步:货架满了,生产者必须等(阻塞),通知消费者来拿。货架空了,消费者必须等 (阻塞),通知生产者来补货。
基于条件变量的阻塞队列
#pragma once #include <queue> #include <mutex> #include <condition_variable> #include <iostream> template <typename T> class BlockQueue { private: std::queue<T> _q; // 1. 中间场所:缓冲区 size_t _cap; // 容量限制 std::mutex _mtx; // 互斥锁:保护队列安全 std::condition_variable _full; // 条件变量:队列满时,生产者等待 std::condition_variable _empty; // 条件变量:队列空时,消费者等待 public: BlockQueue(size_t cap = 5) : _cap(cap) {} // 生产者放入数据 void push(const T& in) { std::unique_lock<std::mutex> lock(_mtx); // 关键点:虚假唤醒 (Spurious Wakeup) // 必须使用 while 循环检查条件,而不是 if。 // 因为 wait 返回时,可能条件并不满足(例如被信号中断,或者多个线程同时被唤醒但资源被抢走)。 while (_q.size() >= _cap) { // 队列满了,生产者需要在 _full 上等待 // wait 会自动释放锁,被唤醒后会自动重新获取锁 _full.wait(lock); } _q.push(in); // 生产了数据,唤醒可能在等待的消费者 _empty.notify_one(); } // 消费者取出数据 void pop(T& out) { std::unique_lock<std::mutex> lock(_mtx); // 关键点:虚假唤醒 while (_q.empty()) { // 队列空了,消费者需要在 _empty 上等待 _empty.wait(lock); } out = _q.front(); _q.pop(); // 消费了数据,腾出了空间,唤醒可能在等待的生产者 _full.notify_one(); } bool is_empty() { std::unique_lock<std::mutex> lock(_mtx); return _q.empty(); } bool is_full() { std::unique_lock<std::mutex> lock(_mtx); return _q.size() == _cap; } };线程运行函数
#include <iostream> #include <thread> #include <unistd.h> #include <random> #include "BlockQueue.hpp" // 任务类型,可以是简单的 int,也可以是复杂的结构体 // 这里演示 int,因为使用了模板,后续可以轻松替换 typedef int Task; void producer(BlockQueue<Task>* bq) { // 随机数生成器 std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dis(1, 100); while (true) { // 1. 生产数据 Task data = dis(gen); // 2. 推送到队列 bq->push(data); std::cout << "[生产者] 生产数据: " << data << std::endl; // 模拟生产耗时,稍微快一点,测试消费者处理不过来的情况(或者反之) usleep(500000); // 0.5秒 } } void consumer(BlockQueue<Task>* bq) { while (true) { Task data; // 1. 从队列获取数据 bq->pop(data); // 2. 处理数据 std::cout << " [消费者] 消费数据: " << data << std::endl; // 模拟消费耗时,稍微慢一点 sleep(1); } } int main() { // 创建一个容量为 5 的阻塞队列 BlockQueue<Task>* bq = new BlockQueue<Task>(5); // 创建线程 // 321原则中的 "2个对象":生产者和消费者线程 std::thread p(producer, bq); std::thread c(consumer, bq); p.join(); c.join(); delete bq; return 0; }结果展示
[生产者] 生产数据: 54 [消费者] 消费数据: 54 [生产者] 生产数据: 71 [消费者] 消费数据: 71 [生产者] 生产数据: 39 [生产者] 生产数据: 24 [消费者] 消费数据: 39 <-- 消费者处理慢,生产者多生产了几个 [生产者] 生产数据: 16 [生产者] 生产数据: 56基于posix信号量实现:
这是基于 固定大小数组 (vector 实现)的循环队列。
核心资源定义:
_space_sem:剩余空间数量。初始值 = 队列容量 (Cap)。_data_sem:可用数据数量。初始值 = 0。
生产者逻辑:
- P(
_space_sem):申请一个空位。如果没有空位(满),就在这里阻塞。 - Lock(
_p_mtx):加锁保护_p_step下标(防止多生产者冲突)。 - Write Data:写入数据。
- Unlock(
_p_mtx):解锁。 - V(
_data_sem):增加一个数据计数,唤醒消费者。
- P(
消费者逻辑:
- P(
_data_sem):申请一个数据。如果没有数据(空),就在这里阻塞。 - Lock(
_c_mtx):加锁保护_c_step下标(防止多消费者冲突)。 - Read Data:读取数据。
- Unlock(
_c_mtx):解锁。 - V(
_space_sem):增加一个空间计数,唤醒生产者。
- P(
#pragma once #include <vector> #include <semaphore.h> #include <mutex> #include <iostream> template <typename T> class RingQueue { private: std::vector<T> _ring; int _cap; int _p_step; // 生产者下标 int _c_step; // 消费者下标 sem_t _space_sem; // 生产者关注的空间资源(初始为容量) sem_t _data_sem; // 消费者关注的数据资源(初始为0) std::mutex _p_mtx; // 保护生产者的下标(支持多生产者) std::mutex _c_mtx; // 保护消费者的下标(支持多消费者) public: RingQueue(int cap = 5) : _ring(cap), _cap(cap), _p_step(0), _c_step(0) { // 初始化信号量 // pshared=0: 线程间共享 // value=_cap: 初始剩余空间为满 sem_init(&_space_sem, 0, _cap); // value=0: 初始数据量为0 sem_init(&_data_sem, 0, 0); } ~RingQueue() { sem_destroy(&_space_sem); sem_destroy(&_data_sem); } // 生产数据 void push(const T& in) { // 1. P操作:申请空间资源 (space--) // 如果 sem 值为 0,线程会在此阻塞,不需要 while 循环判空,信号量内部处理了 sem_wait(&_space_sem); // 2. 临界区:操作环形队列 { std::lock_guard<std::mutex> lock(_p_mtx); _ring[_p_step] = in; _p_step++; _p_step %= _cap; // 维持环形 } // 3. V操作:发布数据资源 (data++) // 唤醒可能在等待数据的消费者 sem_post(&_data_sem); } // 消费数据 void pop(T& out) { // 1. P操作:申请数据资源 (data--) // 如果 sem 值为 0,消费者在此阻塞 sem_wait(&_data_sem); // 2. 临界区:操作环形队列 { std::lock_guard<std::mutex> lock(_c_mtx); out = _ring[_c_step]; _c_step++; _c_step %= _cap; // 维持环形 } // 3. V操作:发布空间资源 (space++) // 唤醒可能在等待空间的生产者 sem_post(&_space_sem); } };对比两种实现
| 特性 | BlockQueue (条件变量 + Mutex) | RingQueue (信号量 + Mutex) |
|---|---|---|
| 底层机制 | queue (链表 / 动态数组) + cond_wait | vector (固定数组) + sem_wait |
| 空间管理 | 动态增长 (通常) | 固定大小 (预分配) |
| 同步逻辑 | 先加锁 -> 再检查条件 (while) -> 阻塞 | 先预判资源 (P 操作) -> 再加锁 -> 操作 |
| 锁的粒度 | 粗粒度:生产者和消费者共用一把大锁 (通常) | 细粒度:生产者锁 p_lock ,消费者锁 c_lock |
| 并发度 | 生产者和消费者互斥(因为操作同一个 queue) | 生产者和消费者可以并行(只要不空不满) |
| 适用场景 | 任务队列长度不确定,或者逻辑复杂 | 高性能、定长缓冲、流媒体处理 |
关键区别:
在 RingQueue 中,生产者和消费者各自拥有自己的锁 ( _p_mtx 和 _c_mtx )。如果队列 既不空也不满,生产者在 p_index 处写。消费者在 c_index 处读,互不干扰,完全并行! 这是环形队列最大的性能优势。
为什么阻塞队列不能直接搞两个锁?
答案: 直接用 std::queue 是 不行 的。
std::queue 底层默认是 std::deque (双端队列)或 std::vector 。有内存重排风险 :当你 push 时,底层容器可能空间不足,需要 扩容 (Reallocation)。这会申请新内存、搬移旧数据、释放旧内存。如果此时另一个线程正在 pop ,它持有的指针或迭代器会瞬间失效,导致程序崩溃。