生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)
1、消费者与生产者
生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个
公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消
息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此
时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者
还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
生产者-消费者模型是指:
1) 生产者进行生产将物品放入仓库,同一时间只能有一个生产者将物品放入仓库,如果仓库满,生产者等待。
2) 消费者从仓库中取出物品,同一时间只能有一个消费者取出物品,如果仓库空,消费者等待;
3) 生产者将物品放入仓库时消费者不能同时取;
4) 消费者取物品时生产者不能放入物品;
总之,就是生产者群体或消费者群体内部是互斥的,两个群体之间是同步的。
为了更好的理解生产者消费者模型,我们自己总结的 “321原则”
“3”代表的是三种关系
生产者与消费者的互斥与同步关系
生产者与生产者的互斥(或竞争)关系
消费者与消费者的互斥(或竞争)关系
“2”代表两种角色
生产者:往交易场所放东西(在计算机中一般都是数据)的人
消费者:从交易场所取东西的人
“1”代表一个交易场所(比如说超市)
所谓交易场所就是内存中具有存储数据的一段有界缓冲区
综上,给出生产者消费者模型的描述:两个进程共享一个缓冲区,一个进程称为生产者向缓冲区中放数据,另一
个称为消费者从缓冲取中取数据,当缓冲区中被放时,生产者进程就必须可进入挂起状态,直到消费者从缓冲中取走
数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
缓冲区的设计:
- 缓冲区是一个先进先出队列。写入模块将信息插入队列;读出模块将信息弹出队列。
- 写入模块与读出模块需要进行信息的协调和同步。
- 对于多线程和多进程的写入或读出模块,写入模块间以及读出模块间需要进行临界区处理。
运行结果:
(2)基于环形队列的生产者消费者模型;:
环形队列是在实际编程极为有用的数据结构, 它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组
织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环
形队列。
环形队列的特点是:不需要进行动态的内存释放和分配,使用固定大小的内存空间反复使用。在实际的队列插入和弹出操作中, 是不断交叉进行的。
环形队列的原则是:
消费者永远跟在生产者之后(不超过);
生产者不能将消费者套一个圈(不套圈);
为空为满时必须保证特定运行。
空状态时,生产者与消费者指向同一地方;不为空、不为满时,消费者在生产者之后;为空时,生产者先运行;
为满时,消费者先运行,生产者被挂起。
下面是基于环形队列的生产者与消费者模型的编写:
运行结果:
(3)基于多线程环形队列的生产者与消费者的模型的编写
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <semaphore.h>
#define SIZE 10
sem_t product_sem;
sem_t consumer_sem;
static int pro=SIZE;
static int con=0;
static int p=0;
static int c=0;
static int d=0;
static int table[SIZE];
void* product_fun(void* val)
{
int i=1;
int index=0;
while(i)
{
sem_wait(&product_sem)
table[p]=i;
pro--;
con++;
printf("which product:%u\n",pthread_self);
printf("product# product success,total:%d\n",i++);
printf("product# you have %d blank source!\n",pro);
printf("product# you have %d data source!\n",con);
for(index=0;index<SIZE;index++)
{
if(table[index]!=0)
printf("array[%d]:%d ",index,table[index]);
}
printf("\n");
sem_post(&consumer_sem);
p=(p+1)%SIZE;
sleep(1);
}
return NULL;
}
void* consumer_fun(void* val)
{
int i=1;
int temp=0;
while(i)
{
sem_wait(&consumer_sem);
if(c!=0)
d=c-1;
temp=table[c];
table[c]=0;
pro++;
con--;
printf("which consumer:%u\n",pthread_self());
printf("consume: %d\n",temp);
printf("consumer# you have %d data source!\n",con);
printf("consumer# you have %d blank source!\n",pro);
sem_post(&product_sem);
c=(c+1)%SIZE;
}
}
void destroy()
{
sem_destroy(&product_sem);
sem_destroy(&consumer_sem);
exit(0);
}
void initsem()
{
signal(2,destroy);
int i=0;
for(i=0;i<SIZE;++i)
table[i]=0;
sem_init(&product_sem,0,SIZE);
sem_init(&consumer_sem,0,0);
}
int main()
{
initsem();
pthread_t product1,product2,consumer1,consumer2;
pthread_create(&product1,NULL,product_fun,NULL);
pthread_create(&product2,NULL,product_fun,NULL);
pthread_create(&consumer1,NULL,consumer_fun,NULL);
pthread_create(&consumer2,NULL,consumer_fun,NULL);
pthread_join(product1,NULL);
pthread_join(product2,NULL);
pthread_join(consumer1,NULL);
pthread_join(consumer2,NULL);
return 0;
}
(4)基于多生产多消费的模型:
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
#define SIZE 64
#define CONSUMER 3//定义生产者的个数
#define PRODUCTER 3//定义消费者的个数
int ring[SIZE];
pthread_mutex_t lock1= PTHREAD_MUTEX_INITIALIZER;//用于完成生产者与生产者之间的互斥
pthread_mutex_t lock2= PTHREAD_MUTEX_INITIALIZER;//用于完成消费者与消费者之间的互斥
sem_t blank_sem;
sem_t data_sem;
void *product( void *arg)
{
int data=0;
static int step=0;
while(1)
{
pthread_mutex_lock(&lock1);//加锁
int data =rand( )%1234;
sem_wait(&blank_sem);//p操作
ring[step]=data;//产生数据
sem_post(&data_sem);//v操作
printf("The product done:%d\n",data);
step++;
step%=SIZE;
pthread_mutex_unlock(&lock1);//解锁
sleep(2);
}
}
void *consume( void *arg)
{
static int step=0;
while(1)
{
pthread_mutex_lock(&lock2);//加锁
int data=-1;
sem_wait(&data_sem);
data=ring[step];
sem_post(&blank_sem);
printf("The consume done:%d\n",data);
step++;
step%=SIZE;
pthread_mutex_unlock(&lock2);//解锁
sleep(2);
}
}
int main( )
{
pthread_t p[PRODUCTER];//创建生产者数组
pthread_t c[CONSUMER];//创建消费者数组
int i=0;
for(i=0;i<PRODUCTER;i++)//创建多生产者线程
{
pthread_create(&p[i],NULL,product,NULL);
}
for(i=0;i<CONSUMER;i++)//创建多消费者线程
{
pthread_create(&c[i],NULL,consume,NULL);
}
sem_init(&blank_sem,0,SIZE);
sem_init(&data_sem,0,0);
for( i=0;i<PRODUCTER;i++)//生产者线程等待
{
pthread_join(p[i],NULL);
}
for( i=0;i<CONSUMER;i++)//消费者线程等待
{
pthread_join(c[i],NULL);
}
sem_destroy(&blank_sem);
sem_destroy(&data_sem);
pthread_mutex_destroy(&lock1);
pthread_mutex_destroy(&lock2);
return 0;
}
运行结果: