2013-10-29 105 views
2

我需要C++中的阻塞隊列,並具有超時功能offer()。該隊列旨在用於多個生產者,一個消費者。當我執行時,我沒有發現任何適合這種需求的現有隊列,所以我自己編寫了它。C++阻塞隊列Segfault w/Boost

我看到segfaults出隊列上的take()方法,但它們是間歇性的。我一直在研究代碼問題,但我沒有看到任何看起來有問題的東西。

我想知道如果:

  • 有是不可靠的這種現有的庫,我應該 使用(升壓或頭只首選)。
  • 任何人都看到我需要修復的代碼中有任何明顯的缺陷。

這裏是標題:

class BlockingQueue 
{ 
    public: 
     BlockingQueue(unsigned int capacity) : capacity(capacity) { }; 
     bool offer(const MyType & myType, unsigned int timeoutMillis); 
     MyType take(); 
     void put(const MyType & myType); 
     unsigned int getCapacity(); 
     unsigned int getCount(); 

    private: 
     std::deque<MyType> queue; 
     unsigned int capacity; 
}; 

和相關的實現:

boost::condition_variable cond; 
boost::mutex mut; 

bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis) 
{ 
    Timer timer; 

    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    // We use a while loop here because the monitor may have woken up because 
    // another producer did a PulseAll. In that case, the queue may not have 
    // room, so we need to re-check and re-wait if that is the case. 
    // We use an external stopwatch to stop the madness if we have taken too long. 
    while (queue.size() >= this->capacity) 
    { 
     int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds()); 

     if (monitorTimeout <= 0) 
     { 
      return false; 
     } 

     if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis))) 
     { 
      return false; 
     } 
    } 

    cond.notify_all(); 

    queue.push_back(myType); 

    return true; 
} 

void BlockingQueue::put(const MyType & myType) 
{ 
    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    // We use a while loop here because the monitor may have woken up because 
    // another producer did a PulseAll. In that case, the queue may not have 
    // room, so we need to re-check and re-wait if that is the case. 
    // We use an external stopwatch to stop the madness if we have taken too long. 
    while (queue.size() >= this->capacity) 
    { 
     cond.wait(lock); 
    } 

    cond.notify_all(); 

    queue.push_back(myType); 
} 

MyType BlockingQueue::take() 
{ 
    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    while (queue.size() == 0) 
    { 
     cond.wait(lock); 
    } 

    cond.notify_one(); 

    MyType myType = this->queue.front(); 

    this->queue.pop_front(); 

    return myType; 
} 

unsigned int BlockingQueue::getCapacity() 
{ 
    return this->capacity; 
} 

unsigned int BlockingQueue::getCount() 
{ 
    return this->queue.size(); 
} 

是的,我沒有使用模板實現類 - 這是列表中的下: )

任何幫助,非常感謝。線程問題很難確定。

Ben

+0

你可以請示*你如何使用這個課程?特別是您打電話給'take'。請嘗試製作一個[簡單的可編譯示例](http://sscce.org/),它將顯示此行爲。 –

+0

你的「MyType」如何被複制?這是一個微不足道的POD結構嗎? –

+0

它究竟在哪條線上拋出? –

回答

0

爲什麼是cond和mut全局變量?我希望他們是你的BlockingQueue對象的成員。我不知道還有什麼東西可以觸及這些東西,但那裏可能存在問題。

我也已經實施了ThreadSafeQueue作爲一個更大的項目的一部分:

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

這是一個類似的概念,以你的,除了排隊(又名報價)函數是非阻塞因爲基本上沒有最大容量。爲了執行容量,我通常在系統啓動時添加一個N緩衝池,並在運行時傳遞一個消息隊列,這也消除了在運行時分配內存的需要,我認爲這是一件好事(我通常在嵌入式應用上工作)。

池和隊列之間唯一的區別是池在系統初始化時獲得一堆隊列。所以,你有這樣的事情:

ThreadSafeQueue<BufferDataType*> pool; 
ThreadSafeQueue<BufferDataType*> queue; 

void init() 
{ 
    for (int i = 0; i < NUM_BUFS; i++) 
    { 
     pool.enqueue(new BufferDataType); 
    } 
} 

然後,當你想給你做類似如下的消息:

void producerA() 
{ 
    BufferDataType *buf; 
    if (pool.waitDequeue(buf, timeout) == true) 
    { 
     initBufWithMyData(buf); 
     queue.enqueue(buf); 
    } 
} 

這樣的排隊功能是快速和容易的,但如果池空的,那麼你將阻塞,直到有人將緩衝池放回池中。這個想法是,一些其他的線程將被阻塞在隊列中,當他們已經處理將返回緩衝區池如下:

void consumer() 
{ 
    BufferDataType *buf; 
    if (queue.waitDequeue(buf, timeout) == true) 
    { 
     processBufferData(buf); 
     pool.enqueue(buf); 
    } 
} 

反正看看吧,也許會有所幫助。

0

我想你的代碼中的問題是由多個線程修改雙端隊列。看:

  1. 你正在等待從另一個線程cododing;
  2. 然後立即發送一個信號給其他線程,deque在你想要修改之前解鎖;
  3. 然後你修改deque,而其他線程正在考慮deque是allready解鎖,並開始做同樣的事情。

因此,嘗試在修改deque後放置所有cond.notify_*()。 I .: .:

void BlockingQueue::put(const MyType & myType) 
{ 
    boost::unique_lock<boost::mutex> lock(mut); 
    while (queue.size() >= this->capacity) 
    { 
     cond.wait(lock); 
    } 

    queue.push_back(myType); // <- modify first 

    cond.notify_all();  // <- then say to others that deque is free 
} 

爲了更好地理解,我建議閱讀關於pthread_cond_wait()