贵阳市网站建设_网站建设公司_企业官网_seo优化
2025/12/24 23:20:25 网站建设 项目流程

Asio16-MultiThreadServicesPool.md

前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两个多线程模型,第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享,后面的文章会对比两个模式的区别,这里先介绍第一种模式,多个线程,每个线程管理独立的iocontext服务。

image-20250925151546676

特点:

  • 每个io_context在不同的线程里面run(),所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
  • 因为我们设置的池子的大小为cpu的核心数,如果为16,那么每次GetService返回的io_context都是从下标0-15,当下标为16又模除回0,所以第一个创建的CSession,和第17个创建的CSession使用的都是用一个io_context.也就是说,这两个CSession运行在同一个线程.所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。
  • 多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。

池子的实现:

// .h
#include "Singleton.hpp"
#include <boost/asio.hpp>
#include <boost/asio/executor_work_guard.hpp> // <-- add this line#include <atomic>
#include <memory>
#include <thread>
#include <vector>class ServicesPool : public Singleton<ServicesPool> {
public:friend class Singleton<ServicesPool>;~ServicesPool();ServicesPool(const ServicesPool&) = delete;ServicesPool& operator=(const ServicesPool&) = delete;using Services = boost::asio::io_context;using Work = boost::asio::executor_work_guard<Services::executor_type>;using WorkPtr = std::unique_ptr<Work>;boost::asio::io_context& GetService();void Stop();private:ServicesPool(std::size_t pool_size = std::thread::hardware_concurrency());private:std::vector<std::jthread> _threads;std::vector<Services> _services;std::vector<WorkPtr> _works;std::atomic<std::size_t> _next_io_context;
};// .cpp
#include "ServicesPool.h"
#include <iostream>ServicesPool::ServicesPool(std::size_t pool_size): _services(pool_size), _next_io_context(0)
{_works.reserve(pool_size);for (std::size_t i = 0; i < pool_size; ++i) {_works.emplace_back(std::make_unique<Work>(boost::asio::make_work_guard(_services[i])));// _works.emplace_back(std::make_unique<Work>(_services[i].get_executor()));}for (std::size_t i = 0; i < pool_size; ++i) {_threads.emplace_back([this, i]() {_services[i].run();});}
}ServicesPool::~ServicesPool()
{std::cout << "ServicesPool::~ServicesPool()" << std::endl;
}boost::asio::io_context& ServicesPool::GetService()
{std::size_t index = _next_io_context.fetch_add(1, std::memory_order_relaxed);return _services[index % _services.size()];
}void ServicesPool::Stop()
{for (auto& work : _works) {work.reset();}
}

main.cpp:

try {auto pool = ServicesPool::GetInstance();boost::asio::io_context io_context;boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&](const boost::system::error_code& error, int signal_number) {io_context.stop();});CServer s(io_context, 9999);io_context.run();} catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}

主要的结构就是,main里面创建一个io_context,这个io_context传入CServer s中用于异步的检测连接到来,accept,同时我们注册了信号,用于优雅地退出.而在CServer触发accept的回调的时候,取出线程池中的io_context,用于对应的CSession通信.

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询