多线程生产者/消费者
问题描述:
我在写一个基本的生产者/消费者线程代码。我得到了大部分的工作,但我有一个问题与兰特()。或者,也许我的问题比rand()更深,而rand只是冰山一角。我不想做任何太复杂的事情(无法运行或等待)。多线程生产者/消费者
我有一个全局的整数作为缓冲区。我允许用户输入大小和运行时间的限制。 我让计数器成为一个静态全局变量。 这是我的制片人:
DWORD WINAPI producer(LPVOID n)
{
cout << "\nPRODUCER:The producer is running right now" << endl;
int size = (int)n;
int num = rand()%10;// this is for making item.
while (buffer.size() > size)
{
}
buffer.push_back(num);
counter++;
return (DWORD)n;
}
这是我consumer--
DWORD WINAPI consumer(LPVOID n)
{
cout << "\nCONSUMER:The consumer is running right now" << endl;
while (buffer.empty())
{ }
int item= buffer.front();
cout << "\nCONSUMER:The consumer ate" << item << endl;
counter++;
return (DWORD)n;
}
在main-
while (counter < l)
{
hThreads[0] = CreateThread(NULL, 0, producer, (LPVOID)s, NULL, &id[0]);
hThreads[1] = CreateThread(NULL, 0, consumer, (LPVOID)l, NULL, &id[1]);
waiter = WaitForMultipleObjects(MAX_THREADS, hThreads, TRUE, INFINITE);
}
for (int i = 0; i < MAX_THREADS; i++) {
CloseHandle(hThreads[i]);
}
所以每次只产生1个。 Srand也没有工作。但它运行正确的次数。
编辑--- 所以我固定的生产和消费,以应对竞争条件:
DWORD WINAPI producer(LPVOID s)
{
WaitForSingleObject(Empty, INFINITE);
WaitForSingleObject(Mutex, INFINITE);
cout << "\nPRODUCER...." << endl;
int size = (int)s;
srand(size);
int in = rand() % 10;
cout << "THIS IS IN:::" << in << endl;
while (buffer.size() == size)
{
ReleaseMutex(Mutex);
}
buffer.push_back(in);
counter++;
cout << "\nThe producer produces " << buffer.front() << endl;
ReleaseMutex(Mutex);
ReleaseSemaphore(Full, 1, NULL);
return (DWORD)s;
}
DWORD WINAPI consumer(LPVOID l)
{
WaitForSingleObject(Full, INFINITE);
WaitForSingleObject(Mutex, INFINITE);
cout << "\nCONSUMER...." << endl;
while (buffer.empty())
{
ReleaseMutex(Mutex);
}
int out = buffer.front();
counter++;
ReleaseMutex(Mutex);
ReleaseSemaphore(Empty, 1, NULL);
return (DWORD)l;
}
但随意的事情仍保持演戏了。它只会一直重复生成一个数字(即使播种时也是如此)。
答
是的,创建(并销毁)一个线程来创建或处理一个数字是没有意义的 - 额外的开销不值得。加上你的代码(就像)有一些非常明显的错误或误解。它们是:
- 在主线程中创建工作线程(在while(){}循环中),但只在最后一次销毁它们,那就是只销毁最后一个循环中创建的句柄。
- 正如我的消息所述,srand()被调用来生成每个数字,并始终使用相同的初始种子,因此获得相同的数字是正常的。
- while()循环检查缓冲区是空还是满是没有意义的,并且不应释放互斥锁。
- 对
counter
变量的操作可能是错误的。生产者和消费者线程都增加它,主线程使用它来确定生成/打印的数量。 - 在您的初始代码片段中,
counter
是一个全局变量,其上运行着多个线程,因此您应该以线程安全的方式读取或修改它,而不是以这种方式。您应该使用一些锁定机制,如关键部分或互锁变量访问。
我的建议将是创建一个生产者线程(产生的所有数字)和一个消费者线程(打印所有数),通过缓冲器传送。为此,您将需要以下项目(您已经实现了其中的大部分):
- 一个“完整”信号量,计算缓冲区中的数字,最初为0。
- 一个互补的“Empty”信号量,计算缓冲区中的空项目,最初设置为缓冲区大小。这些信号量的“总和”当然总是等于缓冲区大小。
- 用于访问缓冲区的互斥锁(或临界区)。
- 一个全局变量,用于告诉线程是否退出。
我在这里发布以下代码示例。不知道是否他们的工作,你可能需要修改或进行调试,但是这仅仅是展示的概念:
// Global
#define MAX_BUF 5
BOOL bExit = FALSE;
// Main Thread
Empty = CreateSemaphore(NULL, MAX_BUF, MAX_BUF, NULL);
Full = CreateSemaphore(NULL, 0, MAX_BUF, NULL);
.
.
hThreads[0] = CreateThread(NULL, 0, producer, (LPVOID)l, NULL, &id[0]);
hThreads[1] = CreateThread(NULL, 0, consumer, (LPVOID)l, NULL, &id[1]);
waiter = WaitForMultipleObjects(MAX_THREADS, hThreads, TRUE, INFINITE);
for (int i = 0; i < MAX_THREADS; i++)
CloseHandle(hThreads[i]);
DWORD WINAPI producer(LPVOID nCount)
{
int nItems = (int)nCount;
// Initialize rand() seed - each run will be generating a different sequence
srand(GetTickCount()); // May need to AND GetTickCount() with RAND_MAX ???
// Generate nCount numbers
for (int i = 0; i < nItems; i++)
{
if (bExit) return 9; // Aborted
WaitForSingleObject(Empty, INFINITE); // Wait until at least one item empty
// Lock the buffer and add an item
WaitForSingleObject(Mutex, INFINITE); // Could be EnterCriticalSection() instead
if (buffer.size() >= MAX_BUF)
{
cout << "\nInternal Error: Buffer-full Check Failed!" << endl;
bExit = TRUE; // Tell all threads to exit
ReleaseMutex(Mutex);
return 1; // Exit with Error
}
int in = rand() % 10;
buffer.push_back(in);
cout << "The Producer generated: " << in << endl;
ReleaseMutex(Mutex); // Could be LeaveCriticalSection() instead
ReleaseSemaphore(Full, 1, NULL); // 1 item added, update semaphore
}
cout << "\nThe PRODUCER produced " << nItems << " items." << endl;
return 0; // OK
}
DWORD WINAPI consumer(LPVOID nCount)
{
int nItems = (int)nCount;
// Generate nCount numbers
for (int i = 0; i < nItems; i++)
{
if (bExit) return 9; // Aborted
WaitForSingleObject(Full, INFINITE); // Wait until at least one item in buffer
// Lock the buffer and get an item
WaitForSingleObject(Mutex, INFINITE); // Could be EnterCriticalSection() instead
if (buffer.empty())
{
cout << "\nInternal Error: Buffer-empty Check Failed!" << endl;
bExit = TRUE; // Tell all threads to exit
ReleaseMutex(Mutex);
return 2; // Exit with Error
}
int out = buffer.front();
buffer.erase(buffer.begin()); // Remove item from the list
cout << "The Consumer ate: " << out << endl;
ReleaseMutex(Mutex); // Could be LeaveCriticalSection() instead
ReleaseSemaphore(Empty, 1, NULL); // 1 item removed, update semaphore
}
cout << "\nThe CONSUMER consumed " << nItems << " items." << endl;
return 0; // OK
}
注:
- 的
bExit
变量是全球性的访问受到越来越/修改而不是一个线程,但因为这总是在关键部分(互斥锁)内完成的,所以不需要使用另一个线程或互锁变量访问。 - 诊断消息(例如
cout << "The Consumer ate: " << out << endl;
)或任何其他类型的“处理”这些数据(或在其上“工作”)可能已经在释放对象后被放置。这会更高效,将对象提前释放给其他线程。我已经这样做了,以更好地说明测试中的事件序列。 - 如果您将MAX_BUF设置为1,则应该一次生成/打印一个数字,否则无法确定,但生成的项目减去消耗的项目当然不会超过缓冲区大小。
那么同步在哪里呢? – Mysticial
如果消费者没有任何东西先运行,那么生产者就会进入,然后消费者就会从中断。这取决于首先启动哪个 –
缓冲区上存在未受保护的竞争条件。这是未定义的行为。 – Mysticial