【Linux线程】阻塞队列环形队列(消费者生产者模型的实现)
- 互联网
- 2025-09-02 21:27:01

目录
前言
1. 阻塞队列
2. 环形队列
总结
前言
了解了线程控制、同步与互斥、以及消费者生产者模型,本篇文章为实践篇,对以上内容的实践,使用阻塞队列和环形队列来实现生产者消费者模型;
1. 阻塞队列在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出;
模型整体结构:
一个生产线程,一个消费线程,生产线程负责在队列中生产,一个线程负责去取进行消费;
队列满的时候,生产线程发生阻塞,不再生产队列为空时,消费线程发生阻塞,不再拿任务;在这个体系中要理清楚:
由谁来通知生产线程和消费线程? 谁来唤醒线程?
消费者生产者体系中只有生产者知道什么时候需要消费,只有消费者知道什么时候生产;
所以唤醒线程只能互相唤醒
整体结构较为简单,借助数据结构Queue来实现:
在此之前,可以依据RAII的思想对mutex进行封装,不需要手动的添加解锁(也可以使用C++库在的锁):
#pragma once #include <pthread.h> class mutex { public: mutex(pthread_mutex_t* lock) :_lock(lock) {} void Lock() { pthread_mutex_lock(_lock); } void UnLock() { pthread_mutex_unlock(_lock); } ~mutex() {} private: pthread_mutex_t* _lock; }; class LockGuard { public: LockGuard(pthread_mutex_t* lock) :_mutex(lock) { _mutex.Lock(); } ~LockGuard() { _mutex.UnLock(); } private: mutex _mutex; };阻塞队列成员设计:
const int defaultcap = 5; template<class T> class BlockQueue { private: std::queue<T> _q; int _capacity; pthread_mutex_t _mutex; pthread_cond_t _p_cond; pthread_cond_t _c_cond; };容量设置阻塞队列的大小,锁控制线程安全、两个条件变量用于控制生产线程与消费线程的同步;
核心接口也就只有两个:Push(生产)、Pop(消费);
void Push(const T& in) //生产者 { LockGuard lockguard(&_mutex); while (IsFull()) { pthread_cond_wait(&_p_cond, &_mutex); } _q.push(in); // 有数据了可以唤醒消费者线程来进行消费 pthread_cond_signal(&_c_cond); } void Pop(T* out) //消费者 { LockGuard lockguard(&_mutex); while (IsEmpty()) { pthread_cond_wait(&_c_cond, &_mutex); } *out = _q.front(); _q.pop(); // 唤醒生产者 pthread_cond_signal(&_p_cond); //唤醒放在释放锁的前边后边都可以 }整体逻辑:
#include <pthread.h> #include <ctime> #include <unistd.h> #include <iostream> #include <pthread.h> #include <queue> #include "LockGuard.hpp" const int defaultcap = 5; template<class T> class BlockQueue { public: BlockQueue(int cap = defaultcap) :_capacity(cap) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_p_cond, nullptr); pthread_cond_init(&_c_cond, nullptr); } bool IsFull() { return _q.size() == _capacity; } bool IsEmpty() { return _q.size() == 0; } void Push(const T& in) //生产者 { LockGuard lockguard(&_mutex); //pthread_mutex_lock(&_mutex); while (IsFull()) { pthread_cond_wait(&_p_cond, &_mutex); } _q.push(in); // if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond); //达到生产水平线就唤醒线程 pthread_cond_signal(&_c_cond);//唤醒线程时如果线程本来就醒着,不会有什么影响 //pthread_mutex_unlock(&_mutex); } void Pop(T* out) //消费者 { LockGuard lockguard(&_mutex); //pthread_mutex_lock(&_mutex); while (IsEmpty()) { pthread_cond_wait(&_c_cond, &_mutex); } *out = _q.front(); _q.pop(); //消费者生产者体系中,只有生产者知道什么时候需要消费 //只有消费者知道什么时候生产 // if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond); pthread_cond_signal(&_p_cond);//唤醒放在释放锁的前边后边都可以 // 在锁内唤醒,线程不会在条件变量上等了,转而会到阻塞到申请锁的队列 //pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_p_cond); pthread_cond_destroy(&_c_cond); } private: std::queue<T> _q; int _capacity; pthread_mutex_t _mutex; pthread_cond_t _p_cond; pthread_cond_t _c_cond; // int _consumer_water_line; // _consumer_water_line = _capacity / 3 * 2 // int _productor_water_line; // _productor_water_line = _capacity / 3 }; void *consumer(void *arg) { BlockQueue<int> *bqp = (BlockQueue<int> *)arg; int data; for (;;) { bqp->Pop(&data); std::cout << "Consume data done : " << data << std::endl; } } // more faster void *producter(void *arg) { BlockQueue<int> *bqp = (BlockQueue<int> *)arg; srand((unsigned long)time(NULL)); for (;;) { int data = rand() % 1024; bqp->Push(data); std::cout << "Prodoct data done: " << data << std::endl; sleep(1); } } int main() { BlockQueue<int> bq; pthread_t c, p; pthread_create(&c, NULL, consumer, (void *)&bq); pthread_create(&p, NULL, producter, (void *)&bq); pthread_join(c, NULL); pthread_join(p, NULL); return 0; } 2. 环形队列阻塞队列的方法也有部分欠缺,比如:把资源放到队列中,使用互斥锁一次就只能一个线程访问这个队列;这也会降低效率;这时就可以使用信号量来解决;
使用互斥锁时,把队列看成一个整体来访问,使用信号量可以完美解决 多个线程同时访问队列的不同区域;
基于信号量实现环形队列:
信号量本质就是一个计数器申请信号量本质是预定资源PV操作是原子性的场景分析:
生产者打满这个队列后就不能再生产了(会覆盖原有资源)消费者不能超过生产者生产者和消费者指向同一位置的两种情况:
队列为空(只能让生产者跑)队列为满(只能让消费者跑)也就是说我们需要局部维持互斥和同步
对资源的描述与认识:空间-->p,数据-->d 所以我们需要两个信号量:
p: _space_sem Nc: _data_sem 0生产者生产:
P(_space_sem)// _space_sem 减减
// 生产数据/任务
V(_data_sem)// _data_sem 加加
消费者消费:
P(_data_sem)// _data_sem 减减
// 生产数据/任务
V(_space_sem )// _space_sem 加加
完整代码:
#include <pthread.h> #include <ctime> #include <unistd.h> #include <iostream> #include <pthread.h> #include <queue> #include "LockGuard.hpp" #include <vector> #include <stdlib.h> #include <semaphore.h> #define NUM 16 class RingQueue { private: std::vector<int> q; int cap; // 队列容量 sem_t data_sem; // 数据的数量 sem_t space_sem; // 空余空间数量 int consume_step; // 消费偏移量 int product_step; // 生产偏移量 public: RingQueue(int _cap = NUM) : q(_cap), cap(_cap) { sem_init(&data_sem, 0, 0); sem_init(&space_sem, 0, cap); consume_step = 0; product_step = 0; } // 生产 void PutData(const int &data) { sem_wait(&space_sem); // P q[consume_step] = data; consume_step++; consume_step %= cap; sem_post(&data_sem); // V } // 消费 void GetData(int &data) { sem_wait(&data_sem); data = q[product_step]; product_step++; product_step %= cap; sem_post(&space_sem); } ~RingQueue() { sem_destroy(&data_sem); sem_destroy(&space_sem); } }; void *consumer(void *arg) { RingQueue *rqp = (RingQueue *)arg; int data; for (;;) { rqp->GetData(data); std::cout << "Consume data done : " << data << std::endl; sleep(1); } } void *producter(void *arg) { RingQueue *rqp = (RingQueue *)arg; srand((unsigned long)time(NULL)); for (;;) { int data = rand() % 1024; rqp->PutData(data); std::cout << "Prodoct data done: " << data << std::endl; // sleep(1); } } int main() { RingQueue rq; pthread_t c, p; pthread_create(&c, NULL, consumer, (void *)&rq); pthread_create(&p, NULL, producter, (void *)&rq); pthread_join(c, NULL); pthread_join(p, NULL); }这是一个简易版本的环形队列,如对信号量或线程控制不太熟悉的伙伴,可以阅读我的这篇文章:
【Linux线程】线程互斥与同步
【Linux线程】线程控制
总结
以上便是本文的全部内容,希望对你有所帮助,感谢阅读!
【Linux线程】阻塞队列环形队列(消费者生产者模型的实现)由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【Linux线程】阻塞队列环形队列(消费者生产者模型的实现)”