C++阻塞队列Segfault w/Boost
我需要C++中的阻塞队列,并具有超时功能offer()
。该队列旨在用于多个生产者,一个消费者。当我执行时,我没有发现任何适合这种需求的现有队列,所以我自己编写了它。C++阻塞队列Segfault w/Boost
我看到segfaults出队列上的take()
方法,但它们是间歇性的。我一直在研究代码问题,但我没有看到任何看起来有问题的东西。
我想知道如果:
- 有是不可靠的这种现有的库,我应该 使用(升压或头只首选)。
- 任何人都看到我需要修复的代码中有任何明显的缺陷。
这里是标题:
class BlockingQueue
{
public:
BlockingQueue(unsigned int capacity) : capacity(capacity) { };
bool offer(const MyType & myType, unsigned int timeoutMillis);
MyType take();
void put(const MyType & myType);
unsigned int getCapacity();
unsigned int getCount();
private:
std::deque<MyType> queue;
unsigned int capacity;
};
和相关的实现:
boost::condition_variable cond;
boost::mutex mut;
bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
Timer timer;
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());
if (monitorTimeout <= 0)
{
return false;
}
if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
{
return false;
}
}
cond.notify_all();
queue.push_back(myType);
return true;
}
void BlockingQueue::put(const MyType & myType)
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}
cond.notify_all();
queue.push_back(myType);
}
MyType BlockingQueue::take()
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
while (queue.size() == 0)
{
cond.wait(lock);
}
cond.notify_one();
MyType myType = this->queue.front();
this->queue.pop_front();
return myType;
}
unsigned int BlockingQueue::getCapacity()
{
return this->capacity;
}
unsigned int BlockingQueue::getCount()
{
return this->queue.size();
}
是的,我没有使用模板实现类 - 这是列表中的下: )
任何帮助,非常感谢。线程问题很难确定。
Ben
为什么是cond和mut全局变量?我希望他们是你的BlockingQueue对象的成员。我不知道还有什么东西可以触及这些东西,但那里可能存在问题。
我也已经实施了ThreadSafeQueue作为一个更大的项目的一部分:
https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h
这是一个类似的概念,以你的,除了排队(又名报价)函数是非阻塞因为基本上没有最大容量。为了执行容量,我通常在系统启动时添加一个N缓冲池,并在运行时传递一个消息队列,这也消除了在运行时分配内存的需要,我认为这是一件好事(我通常在嵌入式应用上工作)。
池和队列之间唯一的区别是池在系统初始化时获得一堆队列。所以,你有这样的事情:
ThreadSafeQueue<BufferDataType*> pool;
ThreadSafeQueue<BufferDataType*> queue;
void init()
{
for (int i = 0; i < NUM_BUFS; i++)
{
pool.enqueue(new BufferDataType);
}
}
然后,当你想给你做类似如下的消息:
void producerA()
{
BufferDataType *buf;
if (pool.waitDequeue(buf, timeout) == true)
{
initBufWithMyData(buf);
queue.enqueue(buf);
}
}
这样的排队功能是快速和容易的,但如果池空的,那么你将阻塞,直到有人将缓冲池放回池中。这个想法是,一些其他的线程将被阻塞在队列中,当他们已经处理将返回缓冲区池如下:
void consumer()
{
BufferDataType *buf;
if (queue.waitDequeue(buf, timeout) == true)
{
processBufferData(buf);
pool.enqueue(buf);
}
}
反正看看吧,也许会有所帮助。
我想你的代码中的问题是由多个线程修改双端队列。看:
- 你正在等待从另一个线程cododing;
- 然后立即发送一个信号给其他线程,deque在你想要修改之前解锁;
- 然后你修改deque,而其他线程正在考虑deque是allready解锁,并开始做同样的事情。
因此,尝试在修改deque后放置所有cond.notify_*()
。 I .: .:
void BlockingQueue::put(const MyType & myType)
{
boost::unique_lock<boost::mutex> lock(mut);
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}
queue.push_back(myType); // <- modify first
cond.notify_all(); // <- then say to others that deque is free
}
为了更好地理解,我建议阅读关于pthread_cond_wait()
。
你可以请示*你如何使用这个课程?特别是您打电话给'take'。请尝试制作一个[简单的可编译示例](http://sscce.org/),它将显示此行为。 –
你的“MyType”如何被复制?这是一个微不足道的POD结构吗? –
它究竟在哪条线上抛出? –