主页 > 互联网  > 

c++线程池的实现

c++线程池的实现

目录

一、问题引出

二、优化建议与改进方案

1. 基础加固

2. 功能增强

3. 性能优化

三、完整优化代码示例

四、关键优化点总结

1.安全性增强:

2.功能扩展:

3.性能优化:

4.工业级特性:

五、测试验证建议


一、问题引出

在阿Q技术站看到一个c++线程池的实现。该线程池还有可以改进的地方。

#include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <iostream> class ThreadPool{ public: ThreadPool(size_t num_threads) : stop(false) { for(size_t i = 0; i < num_threads; ++i) { workers.emplace_back([this] { while(true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } template<class F> void enqueue(F&& f) { { std::unique_lock<std::mutex> lock(queue_mutex); tasks.emplace(std::forward<F>(f)); } condition.notify_one(); } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker : workers) worker.join(); } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; int main() { ThreadPool pool(4); // 创建4个线程的线程池 for (int i = 0; i < 8; ++i) { pool.enqueue([i]() { std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << "\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作 }); } std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待任务完成 return 0; }

逻辑问题:

在enqueue时,如果线程池已经停止(stop=true),仍然可以添加任务,这可能导致在析构后执行任务,引发未定义行为。需要在enqueue时检查stop标志。异常处理:如果task()抛出异常,线程将终止,导致线程池中的线程数减少,影响后续任务执行。stop标志可以改用原子类型。
二、优化建议与改进方案 1. 基础加固

问题修复代码:

// 修改 enqueue 函数 template<class F> bool enqueue(F&& f) { // 返回是否提交成功 { std::unique_lock<std::mutex> lock(queue_mutex); if(stop) return false; // 池已关闭拒绝任务 tasks.emplace(std::forward<F>(f)); } condition.notify_one(); return true; } 2. 功能增强

(1) 支持任务返回值

template<class F> auto enqueue(F&& f) -> std::future<decltype(f())> { using RetType = decltype(f()); auto task = std::make_shared<std::packaged_task<RetType()>>( std::forward<F>(f) ); std::future<RetType> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if(stop) throw std::runtime_error("enqueue on stopped pool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; }

(2) 异常安全处理

// 修改工作线程循环 try { task(); } catch (...) { // 记录异常或传递异常 std::lock_guard<std::mutex> lock(exception_mutex); exceptions.push_back(std::current_exception()); } 3. 性能优化

(1) 无锁队列替换

#include <boost/lockfree/queue.hpp> // 或实现无锁队列 class ThreadPool { private: boost::lockfree::queue<std::function<void()>> tasks{1000}; // 初始化容量 // 移除 mutex 和 condition_variable };

(2) 动态线程调整

void resize(size_t new_size) { if (new_size < workers.size()) { // 减少线程 } else if (new_size > workers.size()) { // 增加线程 } }
三、完整优化代码示例 #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <stdexcept> #include <functional> class ThreadPool { public: explicit ThreadPool(size_t threads, size_t max_tasks = 1000) : stop(false), task_count(0) { if(threads == 0) threads = std::thread::hardware_concurrency(); workers.reserve(threads); for(size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while(true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if(stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); --task_count; } try { task(); } catch (...) { std::lock_guard<std::mutex> elock(exception_mutex); exceptions.push_back(std::current_exception()); } } }); } } template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if(stop) throw std::runtime_error("enqueue on stopped pool"); tasks.emplace([task](){ (*task)(); }); ++task_count; } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker : workers) if(worker.joinable()) worker.join(); if(!exceptions.empty()) { // 异常处理逻辑 } } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; std::atomic_bool stop; std::atomic<size_t> task_count; std::mutex exception_mutex; std::vector<std::exception_ptr> exceptions; };
四、关键优化点总结 1.安全性增强:

原子操作 stop 和 task_count

异常捕获与传递机制

入队前状态检查

2.功能扩展:

支持任务返回值(future/packaged_task)

动态线程数量调整预留接口

任务上限保护(通过 max_tasks 参数)

3.性能优化:

原子计数器减少锁竞争

任务批处理优化(示例未展示,可添加)

无锁队列选项(需第三方库支持)

4.工业级特性:

硬件并发数自动检测

资源释放保障(joinable() 检查)

防止任务堆积的拒绝策略


五、测试验证建议 #include<cassert> /* 前面的线程池代码放在这 */ // 验证用例 void test_pool() { ThreadPool pool(4); auto future = pool.enqueue([](int a) { return a*a; }, 5); assert(future.get() == 25); // 压力测试 for(int i=0; i<10000; ++i) { pool.enqueue([i]{ std::this_thread::sleep_for(std::chrono::microseconds(1)); }); } } int main(int argc,char* argv[]) { test_pool(); return 0; }

通过静态分析工具(如Clang-Tidy)和动态测试(如ThreadSanitizer)确保线程安全。

标签:

c++线程池的实现由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“c++线程池的实现