『Linux』生产者与消费者模型
什么是生产者消费者模型?
生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里去,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者之间的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。它与普通队列的区别在于,当队列为空时,从队列取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列中放元素的操作也会被阻塞,直到有元素被从队列中取出(以上操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
生产者消费者模型优点:
解耦合、支持忙闲不均、支持并发。
简记:
一个场所:阻塞队列。
两种角色:生产者和消费者。
三种关系:生产者与生产者之间应具有互斥关系、消费者与消费者之间应具有互斥关系、生产者与消费者之间应具有同步与互斥关系。
生产者与消费者模型的模拟实现
使用互斥量和条件变量模拟实现
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
using std::cout;
using std::endl;
const int n = 4;
int resources = 0;
class BlockQueue{
public:
BlockQueue(int cap = 10) : _capacity(cap){
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_productor, NULL);
pthread_cond_init(&_cond_consumer, NULL);
}
~BlockQueue(){
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_productor);
pthread_cond_destroy(&_cond_consumer);
}
bool BlockQueuePush(int data){
BlockQueueLock();
while(BlockQueueFull()){
ProductorWait();
}
_queue.push(data);
ConsumerWakeUp();
BlockQueueUnLock();
return true;
}
bool BlockQueuePop(int* data){
BlockQueueLock();
while(BlockQueueEmpty()){
ConsumerWait();
}
*data = _queue.front();
_queue.pop();
ProductorWakeUp();
BlockQueueUnLock();
return true;
}
private:
void BlockQueueLock(){
pthread_mutex_lock(&_mutex);
}
void BlockQueueUnLock(){
pthread_mutex_unlock(&_mutex);
}
bool BlockQueueFull(){
return (_queue.size() == _capacity);
}
bool BlockQueueEmpty(){
return _queue.empty();
}
void ProductorWait(){
pthread_cond_wait(&_cond_productor, &_mutex);
}
void ProductorWakeUp(){
pthread_cond_signal(&_cond_productor);
}
void ConsumerWait(){
pthread_cond_wait(&_cond_consumer, &_mutex);
}
void ConsumerWakeUp(){
pthread_cond_signal(&_cond_consumer);
}
private:
std::queue<int> _queue;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _cond_productor;
pthread_cond_t _cond_consumer;
};
void* thr_productor(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
cout << "productor put resources: " << resources << endl;
bq->BlockQueuePush(resources++);
sleep(1);
}
return NULL;
}
void* thr_consumer(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
int data;
bq->BlockQueuePop(&data);
cout << "consumer get data: " << data << endl;
sleep(1);
}
return NULL;
}
int main(){
pthread_t ptid[n], ctid[n];
BlockQueue bq;
int i, ret;
for(i = 0; i < n; ++i){
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&bq);
if(ret != 0){
cout << "productor thread" << i << " create failed!" << endl;
}
}
for(i = 0; i < n; ++i){
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&bq);
if(ret != 0){
cout << "consumer thread" << i << " create failed!" << endl;
}
}
for(i = 0; i < n; ++i){
pthread_join(ptid[i], NULL);
}
for(i = 0; i < n; ++i){
pthread_join(ctid[i], NULL);
}
return 0;
}
运行结果
[[email protected] safe]$ !g++
g++ productor_and_consumer.cc -o productor_and_consumer -lpthread
[[email protected] safe]$ ./productor_and_consumer
productor put resources: 0
productor put resources: 1
productor put resources: 2
consumer get data: 0
consumer get data: 1
productor put resources: 3
consumer get data: 2
consumer get data: 3
productor put resources: 4
consumer get data: 4
productor put resources: 5
productor put resources: 6
consumer get data: 5
consumer get data: 6
productor put resources: 7
consumer get data: 7
productor put resources: 8
productor put resources: 9
consumer get data: 8
productor put resources: 10
consumer get data: 9
consumer get data: 10
productor put resources: 11
consumer get data: 11
productor put resources: 12
productor put resources: 13
productor put resources: 14
consumer get data: 12
consumer get data: 13
consumer get data: 14
productor put resources: 15
consumer get data: 15
使用POSIX信号量模拟实现
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX信号量可以用于线程间同步。
接口介绍
头文件:semaphore.h
sem_t sem;
定义信号变量。
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem:信号变量。
pshared:选项标志,决定信号量用于进程间同步互斥还是线程间的同步互斥。
0:线程间。
!0:进程间。
value:信号量初始计数。
返回值:成功返回0,失败返回-1,errno被设置。
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
功能:计数-1并等待。
参数:
sem:信号变量。
abs_timeout:等待时间。
返回值:成功返回0,失败返回-1,errno被设置。
注意:如计数<=0,sem_wait阻塞,sem_trywait报错返回,sem_timedwait限时阻塞,超时报错返回。
int sem_post(sem_t *sem);
功能:计数+1并唤醒等待。
参数:
sem:信号变量。
返回值:成功返回0,失败返回-1,errno被设置。
int sem_destroy(sem_t *sem);
参数:
sem:信号变量。
返回值:成功返回0,失败返回-1,errno被设置。
实现代码
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
using std::cout;
using std::endl;
const int n = 4;
int resources = 0;
class BlockQueue{
public:
BlockQueue(int cap = 10):_queue(10), _capacity(cap),
_write_step(0), _read_step(0){
sem_init(&_sem_data, 0, 0);
sem_init(&_sem_idle, 0, cap);
sem_init(&_sem_lock, 0, 1);
}
~BlockQueue(){
sem_destroy(&_sem_data);
sem_destroy(&_sem_idle);
sem_destroy(&_sem_lock);
}
bool BlockQueuePush(int data){
ProductorWait();
BlockQueueLock();
_queue[_write_step] = data;
_write_step = (_write_step + 1) % _capacity;
BlockQueueUnLock();
ConsumerWakeUp();
return true;
}
bool BlockQueuePop(int* data){
ConsumerWait();
BlockQueueLock();
*data = _queue[_read_step];
_read_step = (_read_step + 1) % _capacity;
BlockQueueUnLock();
ProductorWakeUp();
return true;
}
private:
void BlockQueueLock(){
sem_wait(&_sem_lock);
}
void BlockQueueUnLock(){
sem_post(&_sem_lock);
}
void ProductorWait(){
sem_wait(&_sem_idle);
}
void ProductorWakeUp(){
sem_post(&_sem_idle);
}
void ConsumerWait(){
sem_wait(&_sem_data);
}
void ConsumerWakeUp(){
sem_post(&_sem_data);
}
private:
std::vector<int> _queue;
int _capacity;
int _write_step;
int _read_step;
sem_t _sem_data;
sem_t _sem_idle;
sem_t _sem_lock;
};
void* thr_productor(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
bq->BlockQueuePush(resources);
cout << "productor put data: " << resources << endl;
++resources;
sleep(1);
}
return NULL;
}
void* thr_consumer(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
int data;
bq->BlockQueuePop(&data);
cout << "consumer get data: " << data << endl;
sleep(1);
}
return NULL;
}
int main(){
pthread_t ptid[n], ctid[n];
BlockQueue bq;
int i, ret;
for(i = 0; i < n; ++i){
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&bq);
if(ret != 0){
cout << "productor thread" << i << " create failed!" << endl;
exit(-1);
}
}
for(i = 0; i < n; ++i){
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&bq);
if(ret != 0){
cout << "consumer thread" << i << " create failed!" << endl;
exit(-1);
}
}
for(i = 0; i < n; ++i){
pthread_join(ptid[i], NULL);
}
for(i = 0; i < n; ++i){
pthread_join(ctid[i], NULL);
}
return 0;
}
运行结果
[[email protected] safe]$ !g++
g++ sem_productor_consumer.cc -o sem_productor_consumer -lpthread
[[email protected] safe]$ ./sem_productor_consumer
productor put data: 0
productor put data: 1
productor put data: 2
consumer get data: 0
consumer get data: 1
productor put data: 3
consumer get data: 2
consumer get data: 3
productor put data: 4
consumer get data: 4
productor put data: 5
productor put data: 6
consumer get data: 5
consumer get data: 6
productor put data: 7
consumer get data: 7
productor put data: 8
productor put data: 9
productor put data: 10
consumer get data: 8
consumer get data: 9
consumer get data: 10
productor put data: 11
consumer get data: 11
productor put data: 12
productor put data: 13
consumer get data: 12
productor put data: 14
consumer get data: 13
consumer get data: 14
productor put data: 15
consumer get data: 15
整理不易,如有帮助,感谢支持。