2012-03-30 68 views
2

我有一些代碼,我修改了(幾乎沒有改變它)從C++併發在行動書,所以會期望它的工作 - 只有它不。 我一直試圖做的是實現一個線程安全的隊列,我可以再爲 線程或線程商店後臺作業隊列是這樣的:使用condition_variable與unique_lock導致定期崩潰(GCC 4.7,OSX)

queue.h

#pragma once 
#include "imgproc/i_queue.h" 
#include <memory> 
#include <thread> 
#include <queue> 
#include <mutex> 
#include <condition_variable> 

using namespace std; 

namespace imgproc { 

    /* Blocking, concurrency-safe queue. The method that blocks is pop(), 
    * which makes the current thread wait until there is a value to pop 
    * from the queue. 
    */ 
    template <typename T> 
    struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T> 
    { 
     ConcurrentQueue() {} 
     ConcurrentQueue(const ConcurrentQueue &) = delete; 
     ConcurrentQueue & operator= (const ConcurrentQueue &) = delete; 

     /* Concurrency-safe push to queue. 
     */ 
     virtual void push(shared_ptr<T> val) 
     { 
     lock_guard<mutex> lk(_mutex); 
     _queue.push(val); 
     _cvar.notify_one(); 
     } 

     /* Concurrency-safe check if queue empty. 
     */ 
     virtual const bool empty() const 
     { 
     lock_guard<mutex> lk(_mutex); 
     bool result(_queue.empty()); 
     return result; 
     } 

     /* Waiting, concurrency-safe pop of value. If there are no values in 
     * the queue, then this method blocks the current thread until there 
     * are. 
     */ 
     virtual shared_ptr<T> pop() 
     { 
     unique_lock<mutex> lk(_mutex); 
     _cvar.wait(lk, [ this ] {return ! _queue.empty(); }); 
     auto value(_queue.front()); 
     _queue.pop(); 
     return value; 
     } 

    private: 
     mutable mutex _mutex; 
     queue<shared_ptr<T>> _queue; 
     condition_variable _cvar; 
    }; 

} 

我的理解是,那裏的互斥體應該保護所有訪問隊列的企圖。但是,我有一個測試崩潰在10約1時間:

測試是 - 崩潰 - fragment.cpp

// Should have threads wait until there is a value to pop 
TEST_F(ConcurrentQueueTest, 
     ShouldHaveThreadsWaitUntilThereIsAValueToPop) { 
    int val(-1); 
    thread t1([ this, &val ] { 
     for (uint i(0) ; i < 1000 ; ++i); 
     val = *_r_queue->pop(); 
    }); 
    for (uint i(0) ; i < 1000 ; ++ i) { 
    for (uint j(0) ; j < 1000 ; ++ j); 
    EXPECT_EQ(-1, val); 
    } 
    _w_queue->push(make_shared<int>(27)); 
    t1.join(); 
    EXPECT_EQ(27, val); 
    EXPECT_TRUE(_r_queue->empty()); 
} 

變量_r_queue_w_queue只是接口在同一ConcurrentQueue例如,在這裏。

從癡迷於調試信息的小時數開始,看起來pop()的調用是導致崩潰的原因,當_queue member實例變量爲空時,總是(我見過)。 任何人都可以給我任何意見,我做錯了什麼,在這裏?我已經看到其他職位尋求類似問題的幫助,但他們似乎總是說條件變量是答案 - 我正在嘗試!

或者,也許一些建議,我可以更好地調試,以幫助我解決它? FWIW,我試着手動執行一個while,其中有一個sleep(1),它仍然會定期崩潰,這表明儘管我盡了最大的努力,我仍然得到了競爭條件 - 只有我真的看不到它。

非常感謝任何&所有的幫助,我保證我已經試圖解決這個問題之前打擾你所有。

乾杯, Doug。

+0

嗨 - 有關更多信息,我已將違規代碼提取到單個主文件中。該程序在編譯時應該(我認爲)只是退出,但它實際上掛在t.join()的最後一個調用上。主文件在這裏:https://gist.github.com/2396866 – biot023 2012-04-16 07:09:47

回答

1

通過閱讀https://gist.github.com/2396866我確定這個問題是測試//應該能夠同時彈出值」。兩個線程創建,然後分離,兩者無限期地pop'ing隊列,即使在測試結束這會影響最後的測試(其中的問題似乎是)

對於一個快速的解決辦法是:。

/* ... */ 

{ // Should be able to concurrently pop values 
    for (uint i(0) ; i < 100 ; ++ i) 
    q.push(make_shared<string>("Monty Halfwit")); 

    pair<uint, uint> counts(0, 0); 
    thread t1([ & ] { 
    while (++counts.first != 50) { 
     this_thread::sleep_for(chrono::milliseconds(1)); 
     q.pop(); 
    } 
    }); 

    thread t2([ & ] { 
    while (++counts.second != 50) { 
     this_thread::sleep_for(chrono::milliseconds(1)); 
     q.pop(); 
    } 
    }); 

    t1.detach(); 
    t2.detach(); 

/* ... */ 

這將使當他們彈出每50串線程死亡。