主页 > 电脑硬件  > 

Linux从0到1——线程池【利用日志Debug】

Linux从0到1——线程池【利用日志Debug】

Linux从0到1——线程池 1. 简易日志系统1.1 可变参数列表1.2 设计日志1.3 改进 2. 线程池2.1 需要用到的头文件2.2 需求分析2.3 实现细节2.4 完整代码 3. 改造单例版本4. 改进思路


1. 简易日志系统
1.1 可变参数列表

1. printf

我们其实一直在使用可变参数列表,printf函数就使用了它。

第一个参数format是一个字符串,指定可变参数的数据格式。第二个参数...就是可变参数。

2. 自己设计一个可变参数的接口,计算任意整数的和

#include <iostream> #include <cstdarg> // 包含处理可变参数的宏 // 函数声明 int sum(int count, ...); // 函数定义 int sum(int count, ...) { int total = 0; va_list args; // 定义一个 va_list 对象,这是一个char *类型(或者void *)的指针 // 初始化 va_list 对象 va_start(args, count); // 遍历所有参数 for (int i = 0; i < count; ++i) { total += va_arg(args, int); // 获取下一个参数并累加 } // 清理 va_list 对象 va_end(args); return total; } int main() { // 测试函数 int result = sum(5, 1, 2, 3, 4, 5); std::cout << "Sum: " << result << std::endl; result = sum(3, 10, 20, 30); std::cout << "Sum: " << result << std::endl; return 0; } va_list:是一个char*或void*的指针类型,不同编译器可能有所差别。va_start:初始化 va_list 对象,使其指向第一个可变参数。第二个参数固定传可变参数的前一个参数。va_arg:从可变参数列表中获取下一个参数。需要指定参数的类型(在这里是 int)。va_end:清理 va_list 对象,确保资源被正确释放(就是将该指针置空)。


1.2 设计日志

1. 需求分析

首先,这个日志中的信息要有各种等级,来表明各个信息的重要程度,和他们的意义;日志信息要能向显示器写入,也能向文件写入。向文件写入时,可以向一个文件写入,也可以按信息不同等级,向不同等级文件写入;每一条日志信息的组成:[等级][写入时间][进程id] 用户自定义信息。

2. 大致思路

设计两个枚举类型,一个来表示信息等级,一个来表示信息输出的方式。log类对外只暴漏一个接口,功能是写入日志信息。

3. 完整的日志类实现Log.hpp

#pragma once #include<iostream> #include<string> #include<cstdarg> #include<ctime> #include<fstream> #include<sys/types.h> #include<sys/stat.h> #include<fcntl.h> #include<unistd.h> enum { Debug = 0, Info, // 正常信息 Warning, // 告警 Error, // 错误 Fatal // 致命错误 }; enum { Screen = 0, // 向显示器打印 OneFile, // 向一个文件打印 ClassFile // 向多个文件打印 }; // 将日志等级转换为string std::string LevelToString(int level) { switch(level) { case Debug: return "Debug"; case Info: return "Info"; case Warning: return "Warning"; case Error: return "Error"; case Fatal: return "Fatal"; default: return "Unknown"; } } const int defaultstyle = Screen; // 默认风格是向显示器打印 const std::string default_filename = "log."; // 默认文件名 const std::string logdir = "log"; // 默认日志文件夹 class Log { public: Log() :_style(defaultstyle) ,_filename(default_filename) { mkdir(logdir.c_str(), 0775); } void Enable(int style) { _style = style; } std::string TimeStampExLocalTime() { time_t currtime = time(nullptr); struct tm *curr = localtime(&currtime); char time_buffer[128]; snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \ curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\ curr->tm_hour, curr->tm_min, curr->tm_sec); return time_buffer; } void WriteLogToOneFile(const std::string &logname, const std::string &message) { umask(0); int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666); if(fd < 0) return; write(fd, message.c_str(), message.size()); close(fd); } void WriteLogToClassFile(const std::string &levelstr, const std::string &message) { std::string logname = logdir; logname += "/"; logname += _filename; logname += levelstr; WriteLogToOneFile(logname, message); } void WriteLog(const std::string &levelstr, const std::string &message) { switch(_style) { case Screen: std::cout << message; break; case OneFile: WriteLogToClassFile("all", message); break; case ClassFile: WriteLogToClassFile(levelstr, message); break; default: break; } } void LogMessage(int level, const char* format, ...) // 类C的一个日志接口 { char rightbuffer[1024]; va_list args; // 这是一个char *类型(或者void *)的指针 va_start(args, format); // 让arg指向可变部分 vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 将可变部分按照指定格式写入到content中 va_end(args); // 释放args, args = nullptr char leftbuffer[1024]; std::string levelstr = LevelToString(level); std::string currtime = TimeStampExLocalTime(); // 获取当前时间 std::string idstr = std::to_string(getpid()); snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \ levelstr.c_str(), currtime.c_str(), idstr.c_str()); std::string loginfo = leftbuffer; loginfo += rightbuffer; WriteLog(levelstr, loginfo); } ~Log() {} private: int _style; std::string _filename; }; LogMessage是对外暴漏的接口,用户可以通过该接口,写入格式化的日志信息。第一个参数是信息等级,第二个是信息格式,第三个是可变参数列表。日志在使用时需要Enable使能,指定日志的输出方式。日志信息统一放在一个的文件夹中,名字默认是log。向文件中写入时,文件名的格式是:log.等级。其中如果指定向一个文件中写入,文件名是log.all。

4. 测试代码:

int main() { Log log; log.Enable(OneFile); log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14); log.LogMessage(Error, "hello %d world %lf\n", 10, 3.14); log.LogMessage(Warning, "hello %d world %lf\n", 10, 3.14); log.LogMessage(Info, "hello %d world %lf\n", 10, 3.14); log.LogMessage(Fatal, "hello %d world %lf\n", 10, 3.14); log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14); log.LogMessage(Debug, "hello %d world %lf\n", 10, 3.14); return 0; }


1.3 改进

1. 线程安全问题

向文件或显示器写入时,不是线程安全的,可以考虑对WriteLog方法上锁。 #include"LockGuard.hpp" ... class Log { public: Log() :_style(defaultstyle) ,_filename(default_filename) { mkdir(logdir.c_str(), 0775); pthread_mutex_init(&_mutex, nullptr); } ... void LogMessage(int level, const char* format, ...) // 类C的一个日志接口 { ... { LockGuard lockguard(&_mutex); WriteLog(levelstr, loginfo); } } ~Log() {} private: int _style; std::string _filename; pthread_mutex_t _mutex; }; LockGuard.hpp是我们之前实现过的守护锁。

2. 配置log

以下代码可以写在Log.hpp头文件中,配置log只需要修改Config类的构造函数即可。 // 配置log Log log; class Config { public: Config() { // 在此配置 log.Enable(ClassFile); } ~Config() {} }; Config config;
2. 线程池
2.1 需要用到的头文件

1. Log.hpp

#pragma once #include<iostream> #include<string> #include<cstdarg> #include<ctime> #include<fstream> #include<sys/types.h> #include<sys/stat.h> #include<fcntl.h> #include<unistd.h> #include"LockGuard.hpp" enum { Debug = 0, Info, // 正常信息 Warning, // 告警 Error, // 错误 Fatal // 致命错误 }; enum { Screen = 0, // 向显示器打印 OneFile, // 向一个文件打印 ClassFile // 向多个文件打印 }; // 将日志等级转换为string std::string LevelToString(int level) { switch(level) { case Debug: return "Debug"; case Info: return "Info"; case Warning: return "Warning"; case Error: return "Error"; case Fatal: return "Fatal"; default: return "Unknown"; } } const int defaultstyle = Screen; // 默认风格是向显示器打印 const std::string default_filename = "log."; // 默认文件名 const std::string logdir = "log"; // 默认日志文件夹 class Log { public: Log() :_style(defaultstyle) ,_filename(default_filename) { mkdir(logdir.c_str(), 0775); pthread_mutex_init(&_mutex, nullptr); } void Enable(int style) { _style = style; } std::string TimeStampExLocalTime() { time_t currtime = time(nullptr); struct tm *curr = localtime(&currtime); char time_buffer[128]; snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", \ curr->tm_year+1900, curr->tm_mon+1, curr->tm_mday,\ curr->tm_hour, curr->tm_min, curr->tm_sec); return time_buffer; } void WriteLogToOneFile(const std::string &logname, const std::string &message) { umask(0); int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666); if(fd < 0) return; write(fd, message.c_str(), message.size()); close(fd); } void WriteLogToClassFile(const std::string &levelstr, const std::string &message) { std::string logname = logdir; logname += "/"; logname += _filename; logname += levelstr; WriteLogToOneFile(logname, message); } void WriteLog(const std::string &levelstr, const std::string &message) { switch(_style) { case Screen: std::cout << message; break; case OneFile: WriteLogToClassFile("all", message); break; case ClassFile: WriteLogToClassFile(levelstr, message); break; default: break; } } void LogMessage(int level, const char* format, ...) // 类C的一个日志接口 { char rightbuffer[1024]; va_list args; // 这是一个char *类型(或者void *)的指针 va_start(args, format); // 让arg指向可变部分 vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 将可变部分按照指定格式写入到content中 va_end(args); // 释放args, args = nullptr char leftbuffer[1024]; std::string levelstr = LevelToString(level); std::string currtime = TimeStampExLocalTime(); // 获取当前时间 std::string idstr = std::to_string(getpid()); snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ", \ levelstr.c_str(), currtime.c_str(), idstr.c_str()); std::string loginfo = leftbuffer; loginfo += rightbuffer; { LockGuard lockguard(&_mutex); WriteLog(levelstr, loginfo); } } ~Log() {} private: int _style; std::string _filename; pthread_mutex_t _mutex; }; // 配置log Log log; class Config { public: Config() { // 在此配置 log.Enable(ClassFile); } ~Config() {} }; Config config;

2. Thread.hpp

#pragma once #include <iostream> #include <string> #include <functional> template<class T> using func_t = std::function<void(T&)>; template<class T> class Thread { public: Thread(const std::string &threadname, func_t<T> func, T &data) :_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data) {} static void *ThreadRoutine(void* args) { Thread *ts = static_cast<Thread *>(args); ts->_func(ts->_data); return nullptr; } bool Start() { int n = pthread_create(&_tid, nullptr, ThreadRoutine, this); if(n == 0) { _isrunning = true; return true; } else return false; } bool Join() { if(!_isrunning) return false; int n = pthread_join(_tid, nullptr); if(n == 0) { _isrunning = false; return true; } return false; } bool IsRunning() { return _isrunning; } std::string ThreadName() { return _threadname; } ~Thread() {} private: pthread_t _tid; // 库级别线程id std::string _threadname; // 线程名 bool _isrunning; // 运行状态 func_t<T> _func; // 线程执行的回调方法 T _data; };

3. LockGuard.hpp

#pragma once #include <pthread.h> // 不定义锁,默认认为外部会给我们传入锁对象 class Mutex { public: Mutex(pthread_mutex_t *lock):_lock(lock) {} void Lock() { pthread_mutex_lock(_lock); } void Unlock() { pthread_mutex_unlock(_lock); } ~Mutex() {} private: pthread_mutex_t *_lock; }; class LockGuard { public: LockGuard(pthread_mutex_t *lock):_mutex(lock) { _mutex.Lock(); } ~LockGuard() { _mutex.Unlock(); } private: Mutex _mutex; };

4. Task.hpp

#pragma once #include <iostream> #include <string> #include <unistd.h> const int defaultvalue = 0; enum { ok = 0, div_zero, // 除0错误 mod_zero, // 模0错误 unknow // 未知错误 }; const std::string opers = "+-*/%)(&"; class Task { public: Task() { } Task(int x, int y, char op) : data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok) { } void Run() { switch (oper) { case '+': result = data_x + data_y; break; case '-': result = data_x - data_y; break; case '*': result = data_x * data_y; break; case '/': { if (data_y == 0) code = div_zero; else result = data_x / data_y; } break; case '%': { if (data_y == 0) code = mod_zero; else result = data_x % data_y; } break; default: code = unknow; break; } } std::string PrintTask() { std::string s; s = std::to_string(data_x); s += oper; s += std::to_string(data_y); s += "=?"; return s; } std::string PrintResult() { std::string s; s = std::to_string(data_x); s += oper; s += std::to_string(data_y); s += "="; s += std::to_string(result); s += " ["; s += std::to_string(code); s += "]"; return s; } void operator()() { Run(); } ~Task() { } private: int data_x; int data_y; char oper; // + - * / % int result; int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4 };
2.2 需求分析

这个线程池要有一个任务队列queue,和一个线程数组vector,任务由外部Push进线程池内部的任务队列中,多个线程并发的从任务队列拿任务,并执行。

并发访问任务队列时,会有线程安全问题,需要控制同步和互斥:

Push操作一定是互斥的,Pop操作一定是互斥的;如果任务队列为空,则让线程等待,Push进去一个任务,就激活一个线程。

每一个线程需要执行的逻辑是,先从任务队列中拿任务,如果没拿到(说明任务队列为空),就先等待;拿到了就立即执行该任务。


2.3 实现细节

1. 线程池的内部成员变量

一个任务队列;一个线程数组;一个整数类型_thread_num记录线程池中的线程数量;一个互斥量和一个条件变量,用来控制访问任务队列时的同步和互斥。 template <class T> class ThreadPool { ... private: std::queue<T> _q; std::vector<Thread<ThreadData>> _threads; // ThreadData是线程信息类 int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; };

2. 设计一个线程信息类,存储每个线程的名字(可扩展)

class ThreadData { public: ThreadData(const std::string &threadname) :_threadname(threadname) {} ~ThreadData() {} public: std::string _threadname; };

3. 构造函数和析构函数

构造函数需要初始化互斥量和条件变量,并且创建指定个数的线程,不启动;析构函数只需要释放互斥量和条件变量即可。 static const int default_num = 5; // 默认线程池中的线程个数 template <class T> class ThreadPool { public: ThreadPool(int thread_num = default_num) : _thread_num(thread_num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); // 构建指定个数的线程 for (int i = 0; i < _thread_num; i++) { // 待优化 std::string threadname = "thread-"; threadname += std::to_string(i + 1); ThreadData td(threadname); Thread<ThreadData> t(threadname, \ std::bind(&ThreadPool<T>::ThreadRun, \ this, std::placeholders::_1), td); _threads.push_back(t); log.LogMessage(Info, "%s is created...\n", threadname.c_str()); } } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } ... private: std::queue<T> _q; std::vector<Thread<ThreadData>> _threads; int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; }; std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1)是一个函数模版对象,其中ThreadRun为每一个线程要执行的方法,这个方法是一个成员函数。

4. ThreadRun方法

向队列中拿任务时,需要上锁;队列中没有任务时,需要让线程等待。 template <class T> class ThreadPool { public: void ThreadWait(const ThreadData &td) { log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str()); pthread_cond_wait(&_cond, &_mutex); } void ThreadRun(ThreadData &td) { while (true) { // 取任务 T t; { LockGuard lockguard(&_mutex); while (_q.empty()) { ThreadWait(td); log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str()); } t = _q.front(); _q.pop(); } // 处理任务 t(); log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \ td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str()); } } ... private: std::queue<T> _q; std::vector<Thread<ThreadData>> _threads; int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; };

5. Push方法,向任务队列中放任务

访问队列时上锁,Push完成后激活一个等待的线程。 template <class T> class ThreadPool { public: void ThreadWakeup() { pthread_cond_signal(&_cond); } void Push(T &in) { log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str()); LockGuard lockgurad(&_mutex); _q.push(in); ThreadWakeup(); } ... private: std::queue<T> _q; std::vector<Thread<ThreadData>> _threads; int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; };

6. 启动线程池,join线程池中的线程

template <class T> class ThreadPool { public: bool Start() { // 启动 for (auto &thread : _threads) { thread.Start(); log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str()); } return true; } // for debug void Wait() { for (auto &thread : _threads) { thread.Join(); } } ... private: std::queue<T> _q; std::vector<Thread<ThreadData>> _threads; int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; };
2.4 完整代码
#pragma once #include <iostream> #include <queue> #include <pthread.h> #include <vector> #include <functional> #include "Log.hpp" #include "Thread.hpp" #include "LockGuard.hpp" static const int default_num = 5; // 默认线程池中的线程个数 class ThreadData { public: ThreadData(const std::string &threadname) :_threadname(threadname) {} ~ThreadData() {} public: std::string _threadname; }; template <class T> class ThreadPool { public: ThreadPool(int thread_num = default_num) : _thread_num(thread_num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); // 构建指定个数的线程 for (int i = 0; i < _thread_num; i++) { // 待优化 std::string threadname = "thread-"; threadname += std::to_string(i + 1); ThreadData td(threadname); Thread<ThreadData> t(threadname, \ std::bind(&ThreadPool<T>::ThreadRun, \ this, std::placeholders::_1), td); _threads.push_back(t); log.LogMessage(Info, "%s is created...\n", threadname.c_str()); } } bool Start() { // 启动 for (auto &thread : _threads) { thread.Start(); log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str()); } return true; } void ThreadWait(const ThreadData &td) { log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str()); pthread_cond_wait(&_cond, &_mutex); } void ThreadWakeup() { pthread_cond_signal(&_cond); } void ThreadRun(ThreadData &td) { while (true) { // 取任务 T t; { LockGuard lockguard(&_mutex); while (_q.empty()) { ThreadWait(td); log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str()); } t = _q.front(); _q.pop(); } // 处理任务 t(); log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \ td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str()); } } void Push(T &in) { log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str()); LockGuard lockgurad(&_mutex); _q.push(in); ThreadWakeup(); } // for debug void Wait() { for (auto &thread : _threads) { thread.Join(); } } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: std::queue<T> _q; std::vector<Thread<ThreadData>> _threads; int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; };

测试代码

#include<iostream> #include<memory> #include"ThreadPool.hpp" #include"Task.hpp" #include"Log.hpp" int main() { std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>()); tp->Start(); srand((uint64_t)time(nullptr) ^ getpid()); while(true) { int x = rand() % 100 + 1; usleep(1234); int y = rand() % 200; usleep(1234); char oper = opers[rand() % opers.size()]; Task t(x, y, oper); tp->Push(t); sleep(1); } tp->Wait(); return 0; }
3. 改造单例版本

线程池一般在整个程序中只有唯一一份,适合用单例模式实现,下面实现“懒汉”线程池。

第一步:构造函数私有化,并禁掉拷贝构造和赋值

template <class T> class ThreadPool { private: ThreadPool(int thread_num = default_num) : _thread_num(thread_num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); // 构建指定个数的线程 for (int i = 0; i < _thread_num; i++) { // 待优化 std::string threadname = "thread-"; threadname += std::to_string(i + 1); ThreadData td(threadname); Thread<ThreadData> t(threadname, \ std::bind(&ThreadPool<T>::ThreadRun, \ this, std::placeholders::_1), td); _threads.push_back(t); log.LogMessage(Info, "%s is created...\n", threadname.c_str()); } } ThreadPool(const ThreadPool<T> &tp) = delete; const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete; public: ... private: ... };

第二步:提供静态指针instance,并提供静态方法GetInstance获取静态指针

template <class T> class ThreadPool { private: // 构造函数私有化,并禁掉拷贝构造和赋值 ... public: // 有线程安全问题 static ThreadPool<T> *GetInstance() { LockGuard lockguard(&sig_lock); if(instance == nullptr) { log.LogMessage(Info, "创建单例成功...\n"); instance = new ThreadPool<T>(); } return instance; } private: // 成员变量 ... static ThreadPool<T> *instance; static pthread_mutex_t sig_lock; }; template<class T> ThreadPool<T>* ThreadPool<T>::instance = nullptr; template<class T> pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

特别注意:

GetInstance函数存在线程安全问题,同时可能有多个线程满足instance == nullptr,从而错误的调用多次new方法,所以需要对GetInstance函数整个上锁;这把锁保护的是instance,所以需要我们设计一个新的互斥量,并且因为GetInstance方法是静态的,没有this指针,所以这个新的互斥量也要是静态的;可是如果是上面这种写法,效率是很低的,在instance不为空时,任何线程想要调用GetInstance应该是立即返回的,但是此时却要回回上锁;下面是GetInstance的改良版,保证了在instance不为空时,任何线程想要调用GetInstance会立即返回,不需要上锁。 template <class T> class ThreadPool { private: ... public: // 有线程安全问题 static ThreadPool<T> *GetInstance() { if(instance == nullptr) { LockGuard lockguard(&sig_lock); if(instance == nullptr) { log.LogMessage(Info, "创建单例成功...\n"); instance = new ThreadPool<T>(); } } return instance; } private: ... static ThreadPool<T> *instance; static pthread_mutex_t sig_lock; }; template<class T> ThreadPool<T>* ThreadPool<T>::instance = nullptr; template<class T> pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

完整单例线程池代码

#pragma once #include <iostream> #include <queue> #include <pthread.h> #include <vector> #include <functional> #include "Log.hpp" #include "Thread.hpp" #include "LockGuard.hpp" static const int default_num = 5; // 默认线程池中的线程个数 class ThreadData { public: ThreadData(const std::string &threadname) :_threadname(threadname) {} ~ThreadData() {} public: std::string _threadname; }; template <class T> class ThreadPool { private: ThreadPool(int thread_num = default_num) : _thread_num(thread_num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); // 构建指定个数的线程 for (int i = 0; i < _thread_num; i++) { // 待优化 std::string threadname = "thread-"; threadname += std::to_string(i + 1); ThreadData td(threadname); Thread<ThreadData> t(threadname, \ std::bind(&ThreadPool<T>::ThreadRun, \ this, std::placeholders::_1), td); _threads.push_back(t); log.LogMessage(Info, "%s is created...\n", threadname.c_str()); } } ThreadPool(const ThreadPool<T> &tp) = delete; const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete; public: // 有线程安全问题 static ThreadPool<T> *GetInstance() { if(instance == nullptr) { LockGuard lockguard(&sig_lock); if(instance == nullptr) { log.LogMessage(Info, "创建单例成功...\n"); instance = new ThreadPool<T>(); } } return instance; } bool Start() { // 启动 for (auto &thread : _threads) { thread.Start(); log.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str()); } return true; } void ThreadWait(const ThreadData &td) { log.LogMessage(Debug, "no task, %s is sleeping...\n", td._threadname.c_str()); pthread_cond_wait(&_cond, &_mutex); } void ThreadWakeup() { pthread_cond_signal(&_cond); } void ThreadRun(ThreadData &td) { while (true) { // 取任务 T t; { LockGuard lockguard(&_mutex); while (_q.empty()) { ThreadWait(td); log.LogMessage(Debug, "thread, %s wake up.\n", td._threadname.c_str()); } t = _q.front(); _q.pop(); } // 处理任务 t(); log.LogMessage(Info, "%s handler task %s done, result is: %s\n", \ td._threadname.c_str(), t.PrintTask().c_str(), t.PrintResult().c_str()); } } void Push(T &in) { log.LogMessage(Debug, "other thread push a task, task is: %s\n", in.PrintTask().c_str()); { LockGuard lockgurad(&_mutex); _q.push(in); ThreadWakeup(); } } // for debug void Wait() { for (auto &thread : _threads) { thread.Join(); } } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: std::queue<T> _q; // 任务队列 std::vector<Thread<ThreadData>> _threads; int _thread_num; // 线程个数 pthread_mutex_t _mutex; pthread_cond_t _cond; static ThreadPool<T> *instance; static pthread_mutex_t sig_lock }; template<class T> ThreadPool<T>* ThreadPool<T>::instance = nullptr; template<class T> pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

测试代码

#include<iostream> #include<memory> #include"ThreadPool.hpp" #include"Task.hpp" #include"Log.hpp" int main() { ThreadPool<Task>::GetInstance()->Start(); srand((uint64_t)time(nullptr) ^ getpid()); while(true) { int x = rand() % 100 + 1; usleep(1234); int y = rand() % 200; usleep(1234); char oper = opers[rand() % opers.size()]; Task t(x, y, oper); ThreadPool<Task>::GetInstance()->Push(t); sleep(1); } ThreadPool<Task>::GetInstance()->Wait(); return 0; }
4. 改进思路

设计一个动态线程池,可以根据当前的线程池中的线程数量和任务数量,动态创建或销毁线程。

具体思路以伪代码的形式呈现给大家,感兴趣的小伙伴可以自己实现:

template <class T> class ThreadPool { private: ... public: ... void CheckSelf() { // 1. _task_num > _task_num_high_water && _thread_num < _thread_num_high_water // 创建更多线程,并更新_thread_num // 2. _task_num == _task_num_low_water && _thread_num >= _thread_num_high_water // 把自己退出了,并且更新_thread_num } void ThreadRun(ThreadData &td) { while (true) { // 取任务 CheckSelf(); ... } } private: ... // 扩展 int _task_num; // 任务数量 // 高低水位线 int _thread_num_low_water; int _thread_num_high_water; int _task_num_low_water; int _task_num_high_water; };
标签:

Linux从0到1——线程池【利用日志Debug】由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Linux从0到1——线程池【利用日志Debug】