C++阻塞队列Segfault w/Boost

问题描述:

我需要C++中的阻塞队列,并具有超时功能offer()。该队列旨在用于多个生产者,一个消费者。当我执行时,我没有发现任何适合这种需求的现有队列,所以我自己编写了它。C++阻塞队列Segfault w/Boost

我看到segfaults出队列上的take()方法,但它们是间歇性的。我一直在研究代码问题,但我没有看到任何看起来有问题的东西。

我想知道如果:

  • 有是不可靠的这种现有的库,我应该 使用(升压或头只首选)。
  • 任何人都看到我需要修复的代码中有任何明显的缺陷。

这里是标题:

class BlockingQueue 
{ 
    public: 
     BlockingQueue(unsigned int capacity) : capacity(capacity) { }; 
     bool offer(const MyType & myType, unsigned int timeoutMillis); 
     MyType take(); 
     void put(const MyType & myType); 
     unsigned int getCapacity(); 
     unsigned int getCount(); 

    private: 
     std::deque<MyType> queue; 
     unsigned int capacity; 
}; 

和相关的实现:

boost::condition_variable cond; 
boost::mutex mut; 

bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis) 
{ 
    Timer timer; 

    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    // We use a while loop here because the monitor may have woken up because 
    // another producer did a PulseAll. In that case, the queue may not have 
    // room, so we need to re-check and re-wait if that is the case. 
    // We use an external stopwatch to stop the madness if we have taken too long. 
    while (queue.size() >= this->capacity) 
    { 
     int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds()); 

     if (monitorTimeout <= 0) 
     { 
      return false; 
     } 

     if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis))) 
     { 
      return false; 
     } 
    } 

    cond.notify_all(); 

    queue.push_back(myType); 

    return true; 
} 

void BlockingQueue::put(const MyType & myType) 
{ 
    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    // We use a while loop here because the monitor may have woken up because 
    // another producer did a PulseAll. In that case, the queue may not have 
    // room, so we need to re-check and re-wait if that is the case. 
    // We use an external stopwatch to stop the madness if we have taken too long. 
    while (queue.size() >= this->capacity) 
    { 
     cond.wait(lock); 
    } 

    cond.notify_all(); 

    queue.push_back(myType); 
} 

MyType BlockingQueue::take() 
{ 
    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    while (queue.size() == 0) 
    { 
     cond.wait(lock); 
    } 

    cond.notify_one(); 

    MyType myType = this->queue.front(); 

    this->queue.pop_front(); 

    return myType; 
} 

unsigned int BlockingQueue::getCapacity() 
{ 
    return this->capacity; 
} 

unsigned int BlockingQueue::getCount() 
{ 
    return this->queue.size(); 
} 

是的,我没有使用模板实现类 - 这是列表中的下: )

任何帮助,非常感谢。线程问题很难确定。

Ben

+0

你可以请示*你如何使用这个课程?特别是您打电话给'take'。请尝试制作一个[简单的可编译示例](http://sscce.org/),它将显示此行为。 –

+0

你的“MyType”如何被复制?这是一个微不足道的POD结构吗? –

+0

它究竟在哪条线上抛出? –

为什么是cond和mut全局变量?我希望他们是你的BlockingQueue对象的成员。我不知道还有什么东西可以触及这些东西,但那里可能存在问题。

我也已经实施了ThreadSafeQueue作为一个更大的项目的一部分:

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

这是一个类似的概念,以你的,除了排队(又名报价)函数是非阻塞因为基本上没有最大容量。为了执行容量,我通常在系统启动时添加一个N缓冲池,并在运行时传递一个消息队列,这也消除了在运行时分配内存的需要,我认为这是一件好事(我通常在嵌入式应用上工作)。

池和队列之间唯一的区别是池在系统初始化时获得一堆队列。所以,你有这样的事情:

ThreadSafeQueue<BufferDataType*> pool; 
ThreadSafeQueue<BufferDataType*> queue; 

void init() 
{ 
    for (int i = 0; i < NUM_BUFS; i++) 
    { 
     pool.enqueue(new BufferDataType); 
    } 
} 

然后,当你想给你做类似如下的消息:

void producerA() 
{ 
    BufferDataType *buf; 
    if (pool.waitDequeue(buf, timeout) == true) 
    { 
     initBufWithMyData(buf); 
     queue.enqueue(buf); 
    } 
} 

这样的排队功能是快速和容易的,但如果池空的,那么你将阻塞,直到有人将缓冲池放回池中。这个想法是,一些其他的线程将被阻塞在队列中,当他们已经处理将返回缓冲区池如下:

void consumer() 
{ 
    BufferDataType *buf; 
    if (queue.waitDequeue(buf, timeout) == true) 
    { 
     processBufferData(buf); 
     pool.enqueue(buf); 
    } 
} 

反正看看吧,也许会有所帮助。

我想你的代码中的问题是由多个线程修改双端队列。看:

  1. 你正在等待从另一个线程cododing;
  2. 然后立即发送一个信号给其他线程,deque在你想要修改之前解锁;
  3. 然后你修改deque,而其他线程正在考虑deque是allready解锁,并开始做同样的事情。

因此,尝试在修改deque后放置所有cond.notify_*()。 I .: .:

void BlockingQueue::put(const MyType & myType) 
{ 
    boost::unique_lock<boost::mutex> lock(mut); 
    while (queue.size() >= this->capacity) 
    { 
     cond.wait(lock); 
    } 

    queue.push_back(myType); // <- modify first 

    cond.notify_all();  // <- then say to others that deque is free 
} 

为了更好地理解,我建议阅读关于pthread_cond_wait()