生产者消费者模型

生产者消费者模型生产者 生产者消费者模型

  • 锁类型 - 挂起等待锁
生产者消费者模型
  • 一种场景
这里指上面的仓库,我们可以用栈,链表等数据结构构造.

  •  两个角色
生产者 - 负责向上面的仓库中生产资源,仓库已满时,需等待到仓库有空为止.
消费者 - 负责消费上面仓库中的资源,仓库为空时,虚等待到生产者向仓库中生产了商品为止.

  • 三种关系
生产者 & 生产者 -> 互斥
消费者 & 消费者 -> 互斥
生产者 & 消费者 -> 同步互斥

代码演示生产者消费者模型

#include <iostream>
using namespace std;
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

pthread_cond_t g_cond;
pthread_mutex_t g_lock;

// 一个场景
// 我们用链表构造
typedef struct ListNode
{
    ListNode* next;
    int data;
    ListNode(int d = -1):next(NULL),data(d)
    {}
}ListNode;

void Init(ListNode* head)
{
    head->next = NULL;
}

void Push(ListNode* head,int data)
{
    ListNode* new_node = new ListNode(data);
    new_node->next = head->next;
    head->next = new_node;
}

int Pop(ListNode* head)
{
    ListNode* to_delete = head->next;
    head->next = to_delete->next;

    int val = to_delete->data;
    delete to_delete;
    return val;
}

// 两种角色
// 生产者
// 消费者

// 三种关系
// 生产者 & 生产者 -> 互斥
// 消费者 & 消费者 -> 互斥
// 生产者 & 消费者 -> 同步互斥

int p = 1;

void* Product(void* arg)
{
    ListNode* head = (ListNode*)arg;
    while(1)
    {
        pthread_mutex_lock(&g_lock);
        printf("Product : %d\n",p);
        Push(head,p++);
        pthread_cond_signal(&g_cond);
        pthread_mutex_unlock(&g_lock);
        usleep(789123);
    }
    return NULL;
}

void* Consume(void* arg)
{
    ListNode* head = (ListNode*)arg;
    while(1)
    {
        pthread_mutex_lock(&g_lock);
        while(head->next == NULL)
        {
            cout<<"consumer wait\n";
            pthread_cond_wait(&g_cond,&g_lock);
        }
        cout<<"wait done\n";

        int val = Pop(head);
        printf("Consume : %d\n",val);

        pthread_mutex_unlock(&g_lock);
        usleep(123456);
    }
    return NULL;
}

int main()
{
    // 初始化互斥锁和条件变量
    pthread_cond_init(&g_cond,NULL);
    pthread_mutex_init(&g_lock,NULL);

    // 公共资源,可以理解为仓库的钥匙
    ListNode head;
    Init(&head);

    // 这里场景设计为 2 个生产者和 2 个消费者
    pthread_t producter[2],consumer[2];
    int i = 0;
    for(; i < 2; ++i)
    {
        pthread_create(&producter[i],NULL,Product,&head);
        pthread_create(&consumer[i],NULL,Consume,&head);
    }

    for(i = 0; i < 2; ++i)
    {
        pthread_join(producter[i],NULL);
        pthread_join(consumer[i],NULL);
    }

    // 销毁互斥锁和条件变量
    pthread_cond_destroy(&g_cond);
    pthread_mutex_destroy(&g_lock);

    return 0;
}

结果演示:
生产者消费者模型

观察结果可以看到
  • 一定是生产者生产处商品,消费者才可以消费,证明生产者和消费者之间是同步互斥的
  • 两个生产者生产的商品的数字不重复,证明生产者和生产者之间是互斥的
  • 两个消费者消费的商品的数字也不重复,证明消费者和消费者之间是互斥的

POSIX信号量

POSIX和SystemV版本的信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源.
但是,POSIX可用于线程间的同步.

初始化信号量
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);

参数:
    pshared : 0 代表线程间共享, 1 代表进程间共享.
    value : 信号量初始值

销毁信号量
int sem_destroy(sem_t* sem);

等待信号量(P)
这个操作功能同 P 操作,会使信号量的值减 1
int sem_wait(sem_t* sem);

发布信号量(V)
这个操作功能同 V 操作,表示资源使用完毕,可以归还信号量,会使信号量的值加 1
int sem_post(sem_t* sem);

我们将上面的生产者消费者模型改写为POSIX信号量版本:
#include <iostream>
using namespace std;
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX 100 // 仓库的最大容量

sem_t data;     // 表示当前空位的资源个数,为完成同步
sem_t blank;    // 表示当前商品的资源个数,为完成同步
sem_t g_block;  // 二元互斥锁,为完成互斥

// 一种场景
// 这里我们使用一个环形的队列
typedef struct queue
{
    int data[MAX];
    int head;
    int tail;
}queue;

void Init(queue* q)
{
    q->head = 0;
    q->tail = 0;
}

void Push(queue* q,int data)
{
    q->data[q->tail++] = data;
    if(q->tail >= MAX)
    {
        q->tail = 0;
    }
}

void Pop(queue* q)
{
    ++q->head;
    if(q->head >= MAX)
    {
        q->head = 0;
    }
}

// 两个角色
// 三种关系
int d = 1;

void* Product(void* arg)
{
    queue* q = (queue*)arg;
    while(1)
    {
        sem_wait(&blank);
        sem_wait(&g_block);
        printf("Product %d\n",d);
        Push(q,d++);
        sem_post(&g_block);
        sem_post(&data);
        usleep(789123);
    }
    return NULL;
}

void* Consume(void* arg)
{
    queue* q = (queue*)arg;
    while(1)
    {
        sem_wait(&data);
        sem_wait(&g_block);
        printf("Consume %d\n",q->data[q->tail-1]);
        Pop(q);
        sem_post(&g_block);
        sem_post(&blank);
        usleep(123456);
    }
    return NULL;
}

int main()
{
    queue q;
    Init(&q);

    sem_init(&data,0,0);    // 初始化为 0 ,表示一开始有 0 个商品
    sem_init(&blank,0,MAX); // 初始化为 MAX, 表示一开始有 MAX 个空位
    sem_init(&g_block,0,1); // 初始化为 1,为了形成一个二元的互斥锁

    pthread_t producter[2],consumer[2];
    int i = 0;
    for(i = 0; i < 2; ++i)
    {
        pthread_create(&producter[i],NULL,Product,&q);
        pthread_create(&consumer[i],NULL,Consume,&q);
    }
    for(i = 0; i < 2; ++i)
    {
        pthread_join(producter[i],NULL);
        pthread_join(consumer[i],NULL);
    }

    sem_destroy(&data);
    sem_destroy(&blank);
    sem_destroy(&g_block);

    return 0;
}

演示结果:
生产者消费者模型

从结果我么也可以看出:
  • 生产者和生产者之间为互斥关系
  • 消费者和消费者之间为互斥关系
  • 生产者和消费者之间为同步互斥关系

以上的环形队列巧妙的将空格也当做一种资源
判断当前生产者是否可以继续生产商品的依据就是当前有没有blank资源
判断消费者是否可以继续消费的依据就是当前是否有data资源
生产者消费者模型