【改进版】C++ 固定线程池实现:基于调用者运行的拒绝策略优化

张开发
2026/4/19 23:50:34 15 分钟阅读

分享文章

【改进版】C++ 固定线程池实现:基于调用者运行的拒绝策略优化
在高并发场景下线程池的任务队列满负载时的拒绝策略直接影响系统稳定性。本文基于之前实现的固定线程池优化了同步队列的Add函数逻辑实现了更健壮的调用者运行拒绝策略解决了任务提交阻塞、队列满时任务丢失等问题并对核心代码进行逐行解析。一、核心设计思路本次优化聚焦于同步队列SyncQueue的Add函数核心改进点引入超时等待机制避免任务提交无限阻塞明确返回值语义区分 “队列停止”“队列满”“任务添加成功” 三种状态线程池层根据返回值触发 “调用者运行” 策略保证任务不丢失完善队列空停止、线程安全等细节提升线程池鲁棒性。二、核心代码逐部分解析1. 同步队列SyncQueue_1.hpp任务存储与线程同步核心同步队列是线程池的任务缓冲区负责实现线程安全的任务入队 / 出队、队列状态管理本次核心优化集中在Add函数。1成员变量定义namespace tulun { static const size_t MaxTaskCount 500; template class T class SyncQueue { private: std::dequeT m_queue; // 任务存储容器deque兼顾头尾操作效率 std::mutex m_mutex; // 互斥锁保证队列操作线程安全 std::condition_variable m_notEmpty; // 队列非空条件变量消费者等待 std::condition_variable m_notFull; // 队列非满条件变量生产者等待 std::condition_variable m_waitStop; // 等待队列为空的停止条件变量 int m_waitTime 100; // 超时等待时间100ms避免无限阻塞 int m_maxSize; // 队列最大容量 bool m_needStop; // 队列停止标志 // ... 省略IsFull/IsEmpty工具函数 }; }m_notEmpty/m_notFull分别用于消费者等待任务、生产者等待队列空闲m_waitTime超时等待阈值解决 “队列满时生产者无限阻塞” 问题m_needStop全局停止标志控制队列生命周期。2核心优化Add 函数任务入队逻辑template class F int Add(F task) { std::unique_lockstd::mutex locker(m_mutex); // 带超时的条件等待队列非满 或 队列停止 auto tag m_notFull.wait_for( locker, std::chrono::milliseconds(m_waitTime), [this]() - bool { return m_needStop || !IsFull(); } ); // 返回值语义2-队列停止1-队列满超时0-添加成功 if (m_needStop) { return 2; // 队列已停止拒绝添加任务 } if(!tag) { return 1; // 超时且队列仍满触发拒绝策略 } // 任务入队唤醒等待的消费者 m_queue.push_back(std::forwardF(task)); m_notEmpty.notify_all(); return 0; // 任务添加成功 }关键改进解析替换原有的while循环超时判断改用wait_for 谓词的简洁写法逻辑更清晰明确返回值2队列已停止此时无需处理任务1队列满且超时触发 “调用者运行” 策略0任务成功入队使用std::forward完美转发任务支持左值 / 右值任务参数避免拷贝开销。3队列停止与等待空队列停止void Stop() { { std::unique_lockstd::mutex locker(m_mutex); m_needStop true; } m_notEmpty.notify_all(); // 唤醒所有等待的消费者 m_notFull.notify_all(); // 唤醒所有等待的生产者 } void WaitQueueEmptyStop() { std::unique_lockstd::mutex locker(m_mutex); // 循环等待队列为空每次超时100ms检查一次 while (!IsEmpty()) { m_waitStop.wait_for(locker, std::chrono::milliseconds(m_waitTime)); } m_needStop true; m_notFull.notify_all(); m_notEmpty.notify_all(); }Stop()强制停止队列立即唤醒所有阻塞的生产者 / 消费者WaitQueueEmptyStop()优雅停止等待队列中所有任务执行完毕后再停止避免任务丢失。2. 固定线程池FixedThreadPool.hpp/.cpp任务调度核心线程池基于SyncQueue实现任务提交、线程管理核心是 “调用者运行” 拒绝策略的落地。1线程池成员与构造函数namespace tulun { class FixedThreadPool { public: using TaskType std::functionvoid(void); // 统一任务类型 private: std::liststd::shared_ptrstd::thread m_threadgroup; // 线程组 tulun::SyncQueueTaskType m_queue; // 任务队列 std::atomicbool m_running; // 线程池运行标志 std::once_flag m_flag; // 保证Stop只执行一次 // ... 省略私有方法声明 }; // 构造函数初始化队列容量启动指定数量的工作线程 FixedThreadPool::FixedThreadPool(size_t m_TaskQueSize ,int numthreads) : m_queue(m_TaskQueSize), m_running(false) { Start(numthreads); } }TaskType统一封装任务为无参可调用对象兼容函数、绑定函数、lambda 等m_threadgroup存储工作线程智能指针自动管理线程生命周期std::atomicbool m_running原子变量保证线程安全的运行状态判断。2工作线程运行逻辑RunInThreadvoid FixedThreadPool::RunInThread() { while (m_running) { TaskType task; m_queue.Take(task); // 阻塞等待获取任务 if (m_running task) { LOG_INFOThread task; task(); // 执行任务 } } }工作线程循环从队列取任务若队列为空则阻塞在m_queue.Take(task)只有线程池运行且任务有效时才执行任务避免空任务 / 停止后执行任务。3拒绝策略落地AddTask 与 submit// 右值任务提交 void FixedThreadPool::AddTask(TaskType task) { if(m_queue.Put(std::forwardTaskType(task)) ! 0) { LOG_INFOtask(); task(); // 调用者运行队列满/停止时由提交任务的线程执行 } } // 左值任务提交 void FixedThreadPool::AddTask(const TaskType task) { if(m_queue.Put(task) ! 0) { LOG_INFOtask(); task(); // 调用者运行策略 } } // 带返回值的任务提交通用模板 template class Func, class... Args auto submit(Func func, Args ...args) { using RetType decltype(func(args...)); // 封装任务为packaged_task支持获取返回值 auto task std::make_sharedstd::packaged_taskRetType()( std::bind(forwardFunc(func), forwardArgs(args)...) ); std::futureRetType result task-get_future(); // 提交任务到队列失败则调用者运行 if (m_queue.Put([task]() - void { (*task)(); }) ! 0) { LOG_ERROR Add task run task; (*task)(); } return result; }核心亮点AddTask根据m_queue.Put的返回值判断是否触发拒绝策略 —— 若返回非 0队列满 / 停止则由提交任务的线程直接执行任务避免任务丢失submit封装带返回值的任务通过std::packaged_task和std::future获取任务执行结果同样支持 “调用者运行” 策略完美转发参数std::forward保证参数传递的高效性支持任意参数类型的任务。4线程池停止逻辑void FixedThreadPool::StopThreadGroup() { m_queue.WaitQueueEmptyStop(); // 等待队列空后停止 m_running false; // 等待所有工作线程退出 for (auto tha : m_threadgroup) { tha-join(); } } void FixedThreadPool::Stop() { std::call_once(m_flag, FixedThreadPool::StopThreadGroup, this); }std::call_once保证StopThreadGroup只执行一次避免重复停止导致的线程崩溃WaitQueueEmptyStop优雅停止确保队列中所有任务执行完毕后再销毁线程池。3. 测试代码Test04_18_Full.cpp验证核心功能测试代码覆盖了基础任务提交、带返回值任务提交、高并发任务提交等场景验证 “调用者运行” 策略的有效性void func(int x) { LOG_INFOfunc x: x; } int main() { tulun::FixedThreadPool mypool(5,1); // 队列容量5工作线程1 const int n 10000; for(int i 0;in;i) { mypool.AddTask(std::bind(func,i)); // 提交10000个任务 } return 0; }队列容量仅为 5工作线程 1大量任务会触发 “队列满” 状态超出队列容量的任务会由主线程调用者直接执行保证任务不丢失。三、优化点总结拒绝策略标准化通过Add函数返回值明确状态线程池层统一处理 “调用者运行” 策略逻辑解耦超时机制避免阻塞引入wait_for超时等待解决生产者无限阻塞问题优雅停止机制WaitQueueEmptyStop保证任务执行完毕后停止避免任务丢失高性能参数传递全程使用完美转发减少拷贝开销支持任意类型任务线程安全保障互斥锁 条件变量 原子变量保证多线程环境下的队列 / 线程池操作安全。四、应用场景与扩展建议适用场景高并发、任务量波动大的场景如后台服务、数据处理调用者运行策略可避免任务丢失同时控制队列负载扩展方向支持更多拒绝策略如丢弃最老任务、丢弃当前任务、自定义回调增加线程池监控如活跃线程数、任务执行耗时、队列长度实现动态线程池根据队列负载调整工作线程数。本次优化在原有固定线程池基础上强化了异常处理和任务可靠性是对 “调用者运行” 拒绝策略的工程化落地可直接应用于生产环境的高并发场景。

更多文章