C++中的消费者/生产者
这是一个经典的c/p问题,其中一些线程在其他线程读取数据时产生数据。生产者和消费者都共享一个常量大小的缓冲区。如果缓冲区为空,则消费者必须等待,如果缓冲区已满,则制片人必须等待。我正在使用信号量来跟踪全部或空的队列。制片人将减少免费斑点信号量,增加值,并增加填充槽信号量。所以我试图实现一个从生成器函数获取一些数字的程序,然后打印出数字的平均值。通过将此视为生产者 - 消费者问题,我试图节省执行该程序的一些时间。 generateNumber函数在进程中导致一些延迟,所以我想创建一些生成数字的线程,并将它们放入队列中。然后,运行主函数的“主线程”必须从队列中读取并求和,然后求平均值。所以这是我到目前为止有:C++中的消费者/生产者
#include <cstdio>
#include <cstdlib>
#include <time.h>
#include "Thread.h"
#include <queue>
int generateNumber() {
int delayms = rand()/(float) RAND_MAX * 400.f + 200;
int result = rand()/(float) RAND_MAX * 20;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = delayms * 1000000;
nanosleep(&ts, NULL);
return result; }
struct threadarg {
Semaphore filled(0);
Semaphore empty(n);
std::queue<int> q; };
void* threadfunc(void *arg) {
threadarg *targp = (threadarg *) arg;
threadarg &targ = *targp;
while (targ.empty.value() != 0) {
int val = generateNumber();
targ.empty.dec();
q.push_back(val);
targ.filled.inc(); }
}
int main(int argc, char **argv) {
Thread consumer, producer;
// read the command line arguments
if (argc != 2) {
printf("usage: %s [nums to average]\n", argv[0]);
exit(1); }
int n = atoi(argv[1]);
// Seed random number generator
srand(time(NULL));
}
我现在有点困惑,因为我不知道如何创建的生成数量多生产线(如果q是不充分),而消费者从阅读队列(即如果q不为空)。我不确定要把主要内容放在什么位置。 也在“Thread.h”中,您可以创建一个线程,一个互斥量或一个信号量。该线程具有.run(threadFunc,arg),.join()等方法。可以锁定或解锁互斥锁。信号量方法已经全部用于我的代码中。
您的队列没有同步,因此多个生产者可以同时拨打push_back
,或者同时拨打pop_front
......这会中断。
简单的方法,以使这项工作是使用一个线程安全的队列,这可能是围绕std::queue
你已经有了,再加上一个互斥体的包装。
您可以通过添加一个互斥开始,锁定/解锁它周围的每个正打电话给你std::queue
- 单个消费者认为应该是足够的,对多个消费者你需要融合front()
和pop_front()
成一个单一的同步呼叫。
要让消费者在队列为空时阻塞,可以向包装器添加条件变量。
这应该足以让您在线找到答案 - 下面的示例代码。
template <typename T> class SynchronizedQueue
{
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable condvar_;
typedef std::lock_guard<std::mutex> lock;
typedef std::unique_lock<std::mutex> ulock;
public:
void push(T const &val)
{
lock l(mutex_); // prevents multiple pushes corrupting queue_
bool wake = queue_.empty(); // we may need to wake consumer
queue_.push(val);
if (wake) condvar_.notify_one();
}
T pop()
{
ulock u(mutex_);
while (queue_.empty())
condvar_.wait(u);
// now queue_ is non-empty and we still have the lock
T retval = queue_.front();
queue_.pop();
return retval;
}
};
更换std::mutex
等人与任何原语的 “Thread.h” 给你。
请问你能提供一些样品代码吗? – 2012-02-22 14:24:50
你是什么意思的消费区块倒数第二行? – 2012-02-22 14:26:47
假设您的消费者将调用'pop()'获取下一个结果:如果队列为空,则应该阻塞,直到生产者添加一些东西,然后返回它;示例代码来了。 – Useless 2012-02-22 14:31:27
我会做的是这样的:
- 让隐藏您的队列
- 了一块数据保存到Q,和删除数据块的创建线程安全的存取方法数据类从q(我将使用单个互斥体或访问器的关键部分)
- 处理情况下,一个consumor没有任何数据可用(睡眠)
- 处理q正在成为太满了,生产者需要放慢速度
- 让线程去无可奈何地添加和删除,因为他们生产/消费
还有,记得睡眠加入到每一个线程,否则你会挂在CPU,而不是给线程调度切换上下文并与其他线程/进程共享CPU的好地方。你不需要,但这是一个好习惯。
在管理这样的共享状态,你需要一个条件变量和 互斥。其基本模式是线沿线的一个功能:
ScopedLock l(theMutex);
while (!conditionMet) {
theCondition.wait(theMutex);
}
doWhatever();
theCondition.notify();
在你的情况,我可能会做的条件变量和互斥类的 成员执行队列中。写的 conditionMet
会!queue.full()
,所以你最终的东西 这样的:
ScopedLock l(queue.myMutex);
while (queue.full()) {
queue.myCondition.wait();
}
queue.insert(whatever);
queue.myCondition.notify();
和阅读:
ScopedLock l(queue.myMutex);
while (queue.empty()) {
queue.myCondition.wait();
}
results = queue.extract();
queue.myCondition.notify();
return results;
根据不同的线程接口上,可能有两个notify
功能:通知其中一个(唤醒单线程),并通知所有 (唤醒所有等待的线程);在这种情况下,你需要 通知所有(或者你需要两个条件变量,一个是空间 写,还有一个东西可以读,每个功能等待一个, 但通知其他)。
使用互斥锁保护队列访问,应该是这样。一个'计算机科学101'有界的生产者 - 消费者队列需要两个信号量(管理空闲/空的计数,生产者/消费者等待,就像你已经做的那样),一个mutex/futex/criticalSection保护队列。
我不明白如何用condvars替换信号量和互斥量是非常有帮助的。重点是什么?你如何实施一个有限制的生产者 - 消费者队列,并且在拥有多个生产者/消费者的所有平台上工作的condvars?
嗨丹,你没有接受任何给你的答案。请给社区一些答案以回答你的问题。 – 2012-02-22 14:03:32
我很抱歉,我甚至没有意识到这是一个选项,直到现在!我接受了我以前提出的所有问题的答案。 – 2012-02-22 14:09:31
非常感谢您的回复。然而,它并不是我所面临的代码,我只是不确定在哪里定义什么特别是与消费者。 – 2012-02-22 20:50:36