生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)

 1、消费者与生产者

     生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个

公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消

息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此

时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者

还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。


生产者-消费者模型是指:

1) 生产者进行生产将物品放入仓库,同一时间只能有一个生产者将物品放入仓库,如果仓库满,生产者等待。

2)  消费者从仓库中取出物品,同一时间只能有一个消费者取出物品,如果仓库空,消费者等待;

3) 生产者将物品放入仓库时消费者不能同时取;

4)  消费者取物品时生产者不能放入物品;

总之,就是生产者群体或消费者群体内部是互斥的,两个群体之间是同步的。

 

     为了更好的理解生产者消费者模型,我们自己总结的 321原则
“3”代表的是三种关系

                       生产者与消费者的互斥与同步关系
                       生产者与生产者的互斥(或竞争)关系
                       消费者与消费者的互斥(或竞争)关系
“2”代表两种角色
                        生产者:往交易场所放东西(在计算机中一般都是数据)的人
                        消费者:从交易场所取东西的人
“1”代表一个交易场所(比如说超市)
                        所谓交易场所就是内存中具有存储数据的一段有界缓冲区

      综上,给出生产者消费者模型的描述:两个进程共享一个缓冲区,一个进程称为生产者向缓冲区中放数据,另一

个称为消费者从缓冲取中取数据,当缓冲区中被放时,生产者进程就必须可进入挂起状态,直到消费者从缓冲中取走

数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。


缓冲区的设计:

  • 缓冲区是一个先进先出队列。写入模块将信息插入队列;读出模块将信息弹出队列。
  • 写入模块与读出模块需要进行信息的协调和同步。
  • 对于多线程和多进程的写入或读出模块,写入模块间以及读出模块间需要进行临界区处理。
(2)下面的程序演示了一个基于单链表的生产者消费者模型的例子。

生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)

运行结果:


生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)

  

(2)基于环形队列的生产者消费者模型;:

       环形队列是在实际编程极为有用的数据结构, 它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组

织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环

形队列。

生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)


环形队列的特点是:不需要进行动态的内存释放和分配,使用固定大小的内存空间反复使用。在实际的队列插入和弹出操作中, 是不断交叉进行的。

环形队列的原则是:
消费者永远跟在生产者之后(不超过);
生产者不能将消费者套一个圈(不套圈);
为空为满时必须保证特定运行。
空状态时,生产者与消费者指向同一地方;不为空、不为满时,消费者在生产者之后;为空时,生产者先运行;
为满时,消费者先运行,生产者被挂起。

下面是基于环形队列的生产者与消费者模型的编写:

生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)

运行结果:

生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)




(3)基于多线程环形队列的生产者与消费者的模型的编写

#include <stdio.h>

#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <semaphore.h>
#define SIZE 10
sem_t product_sem;
sem_t consumer_sem;
static int pro=SIZE;
static int con=0;
static int p=0;
static int c=0;
static int d=0;
static int table[SIZE];
void* product_fun(void* val)
{
    int i=1;
    int index=0;
    while(i)
    {
        sem_wait(&product_sem)
        table[p]=i;
        pro--;
        con++;
        printf("which product:%u\n",pthread_self);
        printf("product#  product success,total:%d\n",i++);
        printf("product#  you have %d blank source!\n",pro);
        printf("product#  you have %d data source!\n",con);
        for(index=0;index<SIZE;index++)
        {
            if(table[index]!=0)
                printf("array[%d]:%d ",index,table[index]);
        }
        printf("\n");
        
        sem_post(&consumer_sem);
        p=(p+1)%SIZE;
        sleep(1);
    }
    return NULL;
}
void* consumer_fun(void* val)
{
    int i=1;
    int temp=0;
    while(i)
    {
        sem_wait(&consumer_sem);
        if(c!=0)
            d=c-1;
        temp=table[c];
        table[c]=0;
        pro++;
        con--;
        printf("which consumer:%u\n",pthread_self());
        printf("consume: %d\n",temp);
        printf("consumer#  you have %d data source!\n",con);
        printf("consumer#  you have %d blank source!\n",pro);
       
        sem_post(&product_sem);
        c=(c+1)%SIZE;
    }
}
void destroy()
{
    sem_destroy(&product_sem);
    sem_destroy(&consumer_sem);
    exit(0);
}
void initsem()
{
    signal(2,destroy);
    int i=0;
    for(i=0;i<SIZE;++i)
        table[i]=0;
    sem_init(&product_sem,0,SIZE);
    sem_init(&consumer_sem,0,0);
}
int main()
{
    initsem();
    pthread_t product1,product2,consumer1,consumer2;
    pthread_create(&product1,NULL,product_fun,NULL);
    pthread_create(&product2,NULL,product_fun,NULL);
    pthread_create(&consumer1,NULL,consumer_fun,NULL);
    pthread_create(&consumer2,NULL,consumer_fun,NULL);
    pthread_join(product1,NULL);
    pthread_join(product2,NULL);
    pthread_join(consumer1,NULL);
    pthread_join(consumer2,NULL);
    return 0;
}


(4)基于多生产多消费的模型:


#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>

#define SIZE 64
#define CONSUMER 3//定义生产者的个数
#define PRODUCTER 3//定义消费者的个数

int ring[SIZE];

pthread_mutex_t lock1= PTHREAD_MUTEX_INITIALIZER;//用于完成生产者与生产者之间的互斥
pthread_mutex_t lock2= PTHREAD_MUTEX_INITIALIZER;//用于完成消费者与消费者之间的互斥

 sem_t blank_sem;
 sem_t data_sem;

void *product( void *arg)
{
    int data=0;
    static int step=0;
    while(1)
    {
        pthread_mutex_lock(&lock1);//加锁
        int data =rand( )%1234;
        sem_wait(&blank_sem);//p操作
        ring[step]=data;//产生数据
        sem_post(&data_sem);//v操作
        printf("The product done:%d\n",data);
        step++;
        step%=SIZE;
        pthread_mutex_unlock(&lock1);//解锁
        sleep(2); 
    }
}
void *consume( void *arg)
{
     static int step=0;
    while(1)
    { 
        pthread_mutex_lock(&lock2);//加锁
        int data=-1;
        sem_wait(&data_sem);
        data=ring[step];
        sem_post(&blank_sem);
        printf("The consume done:%d\n",data);
        step++;
        step%=SIZE;
        pthread_mutex_unlock(&lock2);//解锁
        sleep(2);
    }
}
int main( )
{
    pthread_t p[PRODUCTER];//创建生产者数组
    pthread_t c[CONSUMER];//创建消费者数组

    int i=0;
    for(i=0;i<PRODUCTER;i++)//创建多生产者线程
    {
       pthread_create(&p[i],NULL,product,NULL);
    }
    for(i=0;i<CONSUMER;i++)//创建多消费者线程
    {
     pthread_create(&c[i],NULL,consume,NULL);
    }

    sem_init(&blank_sem,0,SIZE);
    sem_init(&data_sem,0,0);

    for( i=0;i<PRODUCTER;i++)//生产者线程等待
     {
        pthread_join(p[i],NULL);
     }
    for( i=0;i<CONSUMER;i++)//消费者线程等待
    {
    pthread_join(c[i],NULL);
    }

    sem_destroy(&blank_sem);
    sem_destroy(&data_sem);

    pthread_mutex_destroy(&lock1);
    pthread_mutex_destroy(&lock2);
    return 0;    
}


运行结果:


生产者与消费者模型(基于单链表、环形队列、多线程、多消费多生产)