主页 > 手机  > 

【Linux系统】生产者消费者模型:基于阻塞队列BlockingQueue

【Linux系统】生产者消费者模型:基于阻塞队列BlockingQueue

线程安全的阻塞队列(Blocking Queue)允许生产者和消费者在多线程环境下安全地共享数据。下面我将逐步剖析阻塞队列代码的编写逻辑,并讲解如何编写一个阻塞队列。


1. 基本概念与需求 阻塞队列是一种特殊的队列,当队列满时,生产者线程会被阻塞直到有空间可用;当队列空时,消费者线程会被阻塞直到有数据可用。线程安全是关键:多个线程同时访问队列时,必须确保数据一致性。使用条件变量(Cond)和互斥锁(Mutex)来实现线程间的同步和通信。(注意本文使用的 条件变量(Cond)和互斥锁(Mutex)都是我自己封装过的,完整代码放到文末)
2. 代码结构分析 头文件保护 #ifndef _BLOCKING_QUEUE_HPP_ #define _BLOCKING_QUEUE_HPP_ 防止头文件重复包含,避免编译错误。 引入必要的库 #include <iostream> #include <queue> #include <pthread.h> #include "Cond.hpp" #include "Mutex.hpp" <iostream>:用于调试输出。<queue>:标准库中的队列容器,用于存储数据。<pthread.h>:POSIX线程相关函数(假设 Cond 和 Mutex 是基于此实现的)。"Cond.hpp" 和 "Mutex.hpp":封装了条件变量和互斥锁的功能模块。 命名空间 namespace BlockingQueueModule { template<typename T> class BlockQueue { ... }; } 将阻塞队列封装在命名空间 BlockingQueueModule 中,避免命名冲突。
3. 类定义与成员变量 类模板定义 template<typename T> class BlockQueue { public: BlockQueue(int cap = 5) : _size(0), _cap(cap) {} ~BlockQueue() {} ... private: std::queue<T> _bq; // 存储数据的队列 int _size; // 队列中数据的个数 int _cap; // 队列容量 Cond _producer_cond; // 生产者条件变量 Cond _consumer_cond; // 消费者条件变量 Mutex _mutex; // 互斥锁 int producer_count; // 等待的生产者数量 int consumer_count; // 等待的消费者数量 }; 模板参数 T:支持任意类型的元素。构造函数: 初始化 _size 为 0,表示队列初始为空。设置队列容量 _cap,默认值为 5。 成员变量: _bq:底层存储数据的标准队列。_size 和 _cap:分别表示当前队列大小和最大容量。_producer_cond 和 _consumer_cond:用于生产者和消费者的等待/唤醒机制。_mutex:保护对共享资源的访问。producer_count 和 consumer_count:记录当前等待的生产者和消费者数量。
4. 核心功能实现 生产者方法:Equeue(T& data) void Equeue(T& data) { _mutex.lock(); // 加锁,确保线程安全 while (IsFull()) { // 使用 while 而不是 if,防止伪唤醒问题 producer_count++; std::cout << "queue is full, 生产者进入阻塞" << '\n'; _producer_cond.Wait(_mutex); // 如果队列满,生产者线程阻塞 std::cout << "生产者被唤醒" << '\n'; producer_count--; } _bq.push(data); // 将数据加入队列 _size++; if (consumer_count) { // 如果有消费者在等待 std::cout << "唤醒消费者" << '\n'; _consumer_cond.NotifyOne(); // 唤醒一个消费者线程 } _mutex.unlock(); // 解锁 } 加锁:使用 _mutex.lock() 确保只有一个线程可以修改队列。检查队列是否已满: 如果队列已满(IsFull() 返回 true),调用 _producer_cond.Wait(_mutex),让当前生产者线程进入等待状态。当队列中有空闲空间时,条件变量会唤醒该线程。 插入数据:将数据压入队列,并更新队列大小。唤醒消费者:如果当前有消费者在等待(consumer_count > 0),调用 _consumer_cond.NotifyOne() 唤醒其中一个消费者线程。解锁:释放锁,允许其他线程访问队列。 消费者方法:Pop() T Pop() { _mutex.lock(); // 加锁 while (IsEmpty()) { // 使用 while 而不是 if,防止伪唤醒问题 consumer_count++; std::cout << "queue is empty, 消费者进入阻塞" << '\n'; _consumer_cond.Wait(_mutex); // 如果队列空,消费者线程阻塞 std::cout << "消费者被唤醒" << '\n'; consumer_count--; } T data = _bq.back(); // 获取队列尾部的数据 _bq.pop(); // 移除队列尾部的数据 _size--; if (producer_count) { // 如果有生产者在等待 std::cout << "唤醒生产者" << '\n'; _producer_cond.NotifyOne(); // 唤醒一个生产者线程 } _mutex.unlock(); // 解锁 return data; } 加锁:确保线程安全。检查队列是否为空: 如果队列为空(IsEmpty() 返回 true),调用 _consumer_cond.Wait(_mutex),让当前消费者线程进入等待状态。当队列中有数据时,条件变量会唤醒该线程。 获取数据:从队列尾部取出数据并移除。唤醒生产者:如果当前有生产者在等待(producer_count > 0),调用 _producer_cond.NotifyOne() 唤醒其中一个生产者线程。解锁:释放锁,允许其他线程访问队列。 辅助方法 bool IsEmpty() { return _bq.empty(); } bool IsFull() { return _size == _cap; } IsEmpty():判断队列是否为空。IsFull():判断队列是否已满。
5. 设计细节与注意事项 为什么需要 if (consumer_count) 或 if (producer_count)? 条件变量的 NotifyOne() 方法只会唤醒一个等待的线程。如果直接调用 NotifyOne() 而不检查是否有等待线程,可能会导致不必要的唤醒,浪费系统资源。因此,只有在确实有等待线程时才调用 NotifyOne()。 为什么使用 while 而不是 if?

在多线程环境中,可能存在虚假唤醒(spurious wakeup)问题,也就是伪唤醒。

具体来说:一个线程被唤醒,能够继续往下消费或生产一个物品,需要满足两个条件:1、获取到互斥锁;2、当前队列不为极端情况(不是满的或不是空的)

如果一个消费者线程被唤醒,它获取到了互斥锁,但此时队列里为空,它还继续往下执行代码消费物品,岂不是出错?

这种在条件不满足(为满或为空,线程不能生产或消费)的情况下被唤醒,叫做伪唤醒!!!

因此需要将 if 该成 while:竞争失败了就应该继续进入 wait 状态

条件变量休眠 pthread_cond_wait(&cond, &lock) 需要传入当前线程持有的锁,因为线程在进入条件变量睡眠时,是持有锁的!线程一般是在线程内部的临界区进行条件变量休眠的。如果让一个线程带着锁休眠,就会导致死锁或资源竞争问题。因此需要传入锁,让 pthread_cond_wait 自动释放锁。 问题:线程的 pthread_cond_wait 醒来时,若此时互斥锁获取失败怎么办? 如果线程从 pthread_cond_wait 中醒来但无法获取锁,则该线程会被放入锁的等待队列中。等待队列中的线程会一直等到锁可用为止,然后继续执行后续逻辑。 线程唤醒逻辑放在释放锁之前还是之后? 放在释放锁之前: 唤醒线程后,虽然当前线程仍然持有锁,但被唤醒的线程会自动尝试获取锁。如果被唤醒的线程暂时无法获取锁,它会被放入锁的等待队列中,不会影响系统的正常运行。 放在释放锁之后: 如果在释放锁之后才唤醒其他线程,可能导致多个线程同时争抢锁。对于消费者线程来说,可能会导致所有消费者线程迅速消耗完队列中的资源,最终使队列变空。对于生产者线程来说,可能会导致队列快速填满,从而引发阻塞。 将唤醒逻辑放在释放锁之前通常更好,因为它避免了不必要的锁争抢,同时保证了线程间的公平性。 如何实现多生产者多消费者? 直接创建多个生产者和消费者线程即可。在生产者和生产者之间,在消费者和消费者之间的关系是互斥的,而我们线程之间对于同一个临界区用的是同一把锁,因此天然地就具有一种互斥关系! 为什么定义多生产者时会出现生产相同数据的情况? 因为每个生产者线程都有自己的栈空间,因此它们可以独立生成数据。如果多个生产者线程生成的数据来源相同(例如,基于相同的初始值或算法),则可能会产生相同的数据。这种情况是正常的,因为每个生产者线程的行为是独立的,除非明确要求生产唯一的数据。 生产者消费者模型的并行和串行:理解生产者消费者模型效率高的原因

生产者消费者模型实际上包含两部分,第一个部分是我们前面认识到的,生产者将任务放入任务队列,消费者从任务队列获取任务,这个访问任务队列的过程是互斥的过程,是串行的过程,无论是生产者放入还是消费者拿出,都必须一个一个的串行执行

第二个部分是并行,生产者会消耗一定时间从别处获取任务,即生产任务,等待系统给生产者发布一个任务需求,让生产者作为一个发布者发布给消费者,如阻塞等待响应任务、刷新缓冲区任务、页面响应任务、数据库数据迁移任务….消费者也会消耗一定时间处理获取到的任务。在这个过程中,生产者等待任务和消费者处理任务,生产者和消费者是并行执行的!!

实际上,在整个生产者消费者模型中,并行操作占据了绝大部分时间。相比之下,生产者和消费者访问任务队列的串行操作所占用的时间非常少。这种设计使得任务从分发到执行的过程变得极其高效——任务无需花费过多时间在分发环节上,从而显著提升了整体系统的效率。

总结来说,生产者消费者模型之所以高效,正是因为它充分利用了生产者与消费者之间的并行性,同时将串行操作的影响降到最低,从而实现了任务的快速流转与处理。


6. 总结

这段代码实现了一个功能完善的阻塞队列,其核心思想如下:

使用互斥锁保护共享资源。使用条件变量实现线程间的等待和唤醒。通过 while 循环处理伪唤醒问题。通过计数器(producer_count 和 consumer_count)优化唤醒操作。 7. 完整代码 BlockQueue.hpp #ifndef _BLOCKING_QUEUE_HPP_ #define _BLOCKING_QUEUE_HPP_ #include <iostream> #include <queue> #include <pthread.h> #include "Cond.hpp" #include "Mutex.hpp" using namespace CondModule; using namespace MutexModule; // 使用封装的条件变量和互斥锁 namespace BlockingQueueModule { // 数据 int num = 10; template<typename T> class BlockQueue { public: BlockQueue(int cap = 5) : _size(0), _cap(cap) {} ~BlockQueue() {} // 对于生产者: 数据入队列 void Equeue(T& data) { // 加锁 _mutex.lock(); while (IsFull()) { // 队列已满,生产者阻塞 producer_count++; std::cout << "queue is full, 生产者进入阻塞" << '\n'; _producer_cond.Wait(_mutex); std::cout << "生产者被唤醒" << '\n'; producer_count--; } // 生产数据 _bq.push(data); _size++; // 生产完数据,唤醒消费者 if(consumer_count){ // 为什么要加上这个判断:必须保证现在有消费者在等待 std::cout << "唤醒消费者" << '\n'; _consumer_cond.NotifyOne(); } // 解锁 _mutex.unlock(); } // 对于消费者: 数据出队列 T Pop() { // 加锁 _mutex.lock(); while (IsEmpty()) { // 队列为空,消费者阻塞 consumer_count++; std::cout << "queue is empty, 消费者进入阻塞" << '\n'; _consumer_cond.Wait(_mutex); std::cout << "消费者被唤醒" << '\n'; consumer_count--; } // 消费数据 T data = _bq.back(); _bq.pop(); _size--; // 消费完数据,唤醒生产者 if(producer_count){ // 为什么要加上这个判断:必须保证现在有生产者在等待 std::cout << "唤醒生产者" << '\n'; _producer_cond.NotifyOne(); } // 解锁 _mutex.unlock(); return data; } // 队列是否为空 bool IsEmpty() { return _bq.empty(); } // 队列是否已满 bool IsFull() { return _size == _cap; } private: std::queue<T> _bq; // 存储数据的队列 int _size; // 队列中数据的个数 int _cap; // 队列容量 Cond _producer_cond; // 生产者条件变量; Cond _consumer_cond; // 消费者条件变量; Mutex _mutex; // 计数器 int producer_count; int consumer_count; }; } #endif Cond.hpp #pragma once #include <iostream> #include <pthread.h> #include "Mutex.hpp" namespace CondModule { using namespace MutexModule; class Cond { public: Cond() { pthread_cond_init(&_cond, nullptr); } ~Cond() { pthread_cond_destroy(&_cond); } // 阻塞等待 void Wait(Mutex& mtx) { pthread_cond_wait(&_cond, mtx.getLockPtr()); } // 随机唤醒一个 void NotifyOne() { pthread_cond_signal(&_cond); } // 广播唤醒所有 void NotifyAll() { pthread_cond_broadcast(&_cond); } private: pthread_cond_t _cond; }; } Mutex.hpp #ifndef _MUTEX_HPP #define _MUTEX_HPP #include <iostream> #include <pthread.h> namespace MutexModule { class Mutex { public: // 禁止拷贝 Mutex(const Mutex&) = delete; Mutex& operator=(const Mutex&) = delete; Mutex() { pthread_mutex_init(&_lock, nullptr); } ~Mutex() { pthread_mutex_destroy(&_lock); } // 加锁 void lock() { pthread_mutex_lock(&_lock); } // 解锁 void unlock() { pthread_mutex_unlock(&_lock); } // 获取锁 pthread_mutex_t *getLockPtr() { return &_lock; } private: pthread_mutex_t _lock; }; // 锁保护类 class LockGuard { public: LockGuard(Mutex &mtx) : _mtx(mtx) { _mtx.lock(); } ~LockGuard() { _mtx.unlock(); } private: Mutex& _mtx; // 因为是保护某个已存在的锁,所以这里不是创建锁,而是引用 }; } #endif 8. BlockQueue 的使用:构建任务队列 Main.cc #include "BlockQueue.hpp" #include "Task.hpp" #include <unistd.h> #include <stdlib.h> #include <functional> using namespace BlockingQueueModule; using task_t = std::function<void()>; // 打印任务 void Task_Print() { std::cout << "这是一个打印任务" << std::endl; } // 拷贝任务 void Task_Copy() { std::cout << "这是一个拷贝任务" << std::endl; } void *Producer(void *args) { BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args); while (true) { // 任务队列(funtion) task_t task = Task_Print; bq->Equeue(task); std::cout << "--------------生产者发布任务...---------------" << '\n'; } } void *Consumer(void *args) { BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args); while (true) { sleep(2); // 任务队列 task_t task = bq->Pop(); std::cout << "--------------消费者获取并执行任务...---------------" << '\n'; task(); } } int main() { BlockQueue<task_t> bq; // 创建生产者线程: 1 个 pthread_t _tid1; pthread_create(&_tid1, nullptr, Producer, &bq); // 创建消费者线程: 2 个 pthread_t _tid10, _tid11; pthread_create(&_tid10, nullptr, Consumer, &bq); pthread_create(&_tid11, nullptr, Consumer, &bq); // 回收线程 pthread_join(_tid1, nullptr); pthread_join(_tid10, nullptr); pthread_join(_tid11, nullptr); return 0; }

运行结果如下:

标签:

【Linux系统】生产者消费者模型:基于阻塞队列BlockingQueue由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【Linux系统】生产者消费者模型:基于阻塞队列BlockingQueue