2011-02-16 64 views
6

我遇到了pthreads的問題,我認爲我陷入了僵局。我創建了一個我認爲正在工作的阻塞隊列,但是在做了一些更多的測試之後,我發現如果我嘗試並取消阻塞在blocking_queue上的多個線程,我似乎會陷入僵局。C++ pthread阻塞隊列死鎖(我認爲)

阻塞隊列是非常簡單的,看起來像這樣:

template <class T> class Blocking_Queue 
{ 
public: 
    Blocking_Queue() 
    { 
     pthread_mutex_init(&_lock, NULL); 
     pthread_cond_init(&_cond, NULL); 
    } 

    ~Blocking_Queue() 
    { 
     pthread_mutex_destroy(&_lock); 
     pthread_cond_destroy(&_cond); 
    } 

    void put(T t) 
    { 
     pthread_mutex_lock(&_lock); 
     _queue.push(t); 
     pthread_cond_signal(&_cond); 
     pthread_mutex_unlock(&_lock); 
    } 

    T pull() 
    { 
     pthread_mutex_lock(&_lock); 
     while(_queue.empty()) 
     { 
      pthread_cond_wait(&_cond, &_lock); 
     } 

     T t = _queue.front(); 
     _queue.pop(); 

     pthread_mutex_unlock(&_lock); 

     return t; 
    } 

priavte: 
    std::queue<T> _queue; 
    pthread_cond_t _cond; 
    pthread_mutex_t _lock; 
} 

爲了測試,我創建了4個線程,在這個阻塞隊列拉。我向阻塞隊列添加了一些打印語句,並且每個線程都進入了pthread_cond_wait()方法。但是,當我嘗試在每個線程上調用pthread_cancel()和pthread_join()時,程序就會掛起。

我也測試過這隻有一個線程,它完美的作品。

根據文檔,pthread_cond_wait()是一個取消點,因此在這些線程上調用cancel會導致它們停止執行(並且這隻適用於1個線程)。但是,pthread_mutex_lock不是取消點。當pthread_cancel()被調用時,線程會發生什麼事情,取消的線程在終止之前獲取互斥鎖並且不解鎖它,然後當下一個線程被取消時它不能獲得互斥鎖和死鎖?還是有什麼我做錯了。

任何建議將是可愛的。謝謝:)

+0

嘗試使用[helgrind(http://valgrind.org/info/tools.html#helgrind),它是有用的,在過去對我來說發現競賽狀況和僵局。 – Flexo 2011-02-16 16:16:55

+0

取消可能是背信棄義的。請向我們展示更多的邏輯:工作線程的可取消狀態是什麼?什麼清理處理程序?你究竟如何排序調用取消/加入多個線程? – pilcrow 2011-02-16 16:17:46

回答

5

pthread_cancel()是最好的避免。

你可以通過從那裏拋出一個異常來解除阻塞在Blocking_Queue :: pull()上的所有線程。

隊列中的一個弱點是T t = _queue.front();調用可能會引發異常的T的拷貝構造函數,使您將互斥鎖永久鎖定。更好地使用C++作用域鎖。

這裏優美的線程終止的例子:

$ cat test.cc 
#include <boost/thread/mutex.hpp> 
#include <boost/thread/thread.hpp> 
#include <boost/thread/condition_variable.hpp> 
#include <exception> 
#include <list> 
#include <stdio.h> 

struct BlockingQueueTerminate 
    : std::exception 
{}; 

template<class T> 
class BlockingQueue 
{ 
private: 
    boost::mutex mtx_; 
    boost::condition_variable cnd_; 
    std::list<T> q_; 
    unsigned blocked_; 
    bool stop_; 

public: 
    BlockingQueue() 
     : blocked_() 
     , stop_() 
    {} 

    ~BlockingQueue() 
    { 
     this->stop(true); 
    } 

    void stop(bool wait) 
    { 
     // tell threads blocked on BlockingQueue::pull() to leave 
     boost::mutex::scoped_lock lock(mtx_); 
     stop_ = true; 
     cnd_.notify_all(); 

     if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull() 
      while(blocked_) 
       cnd_.wait(lock); 
    } 

    void put(T t) 
    { 
     boost::mutex::scoped_lock lock(mtx_); 
     q_.push_back(t); 
     cnd_.notify_one(); 
    } 

    T pull() 
    { 
     boost::mutex::scoped_lock lock(mtx_); 

     ++blocked_; 
     while(!stop_ && q_.empty()) 
      cnd_.wait(lock); 
     --blocked_; 

     if(stop_) { 
      cnd_.notify_all(); // tell stop() this thread has left 
      throw BlockingQueueTerminate(); 
     } 

     T front = q_.front(); 
     q_.pop_front(); 
     return front; 
    } 
}; 

void sleep_ms(unsigned ms) 
{ 
    // i am using old boost 
    boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(ms)); 
    // with latest one you can do this 
    //boost::thread::sleep(boost::posix_time::milliseconds(10)); 
} 

void thread(int n, BlockingQueue<int>* q) 
try 
{ 
    for(;;) { 
     int m = q->pull(); 
     printf("thread %u: pulled %d\n", n, m); 
     sleep_ms(10); 
    } 
} 
catch(BlockingQueueTerminate&) 
{ 
    printf("thread %u: finished\n", n); 
} 

int main() 
{ 
    BlockingQueue<int> q; 

    // create two threads 
    boost::thread_group tg; 
    tg.create_thread(boost::bind(thread, 1, &q)); 
    tg.create_thread(boost::bind(thread, 2, &q)); 
    for(int i = 1; i < 10; ++i) 
     q.put(i); 
    sleep_ms(100); // let the threads do something 
    q.stop(false); // tell the threads to stop 
    tg.join_all(); // wait till they stop 
} 

$ g++ -pthread -Wall -Wextra -o test -lboost_thread-mt test.cc 

$ ./test 
thread 2: pulled 1 
thread 1: pulled 2 
thread 1: pulled 3 
thread 2: pulled 4 
thread 1: pulled 5 
thread 2: pulled 6 
thread 1: pulled 7 
thread 2: pulled 8 
thread 1: pulled 9 
thread 2: finished 
thread 1: finished 
1

我不完全熟悉pthread_cancel() - 我更喜歡合作終止。

pthread_cancel()不會讓你的互斥體鎖定嗎?我想你需要用取消處理程序清理。

+0

理論上,當調用pthread_cond_wait()時,互斥鎖應該被釋放(並且,因爲多個線程使它成爲pthread_cond_wait()語句)。然而,當pthread_cancel被調用時,它看起來像互斥體是必需的,至少這是我的解釋。 – vimalloc 2011-02-16 16:20:19

1

我有類似的經驗與pthread_cond_wait()/ pthread_cancel()。由於某種原因,線程返回後,我仍然遇到鎖定問題,並且無法將其解鎖,因爲您必須在鎖定的同一線程中解鎖。我在執行pthread_mutex_destroy()時注意到了這些錯誤,因爲我有一個生產者,單個消費者情況,所以沒有發生死鎖。

pthread_cond_wait()應該在返回時鎖定互斥鎖,並且這可能發生了,但是由於我們強制取消了線程,最終的解鎖沒有經過。爲了安全起見,我通常儘量避免使用pthread_cancel(),因爲有些平臺甚至不支持這一點。你可以使用volatile bool或atomics來檢查線程是否應該關閉。那樣,互斥體也將被幹淨地處理。