2016-09-15 145 views
0

我使用這個類的生產者 - 消費者安裝在C++:C++線程安全的隊列關機

#pragma once 

#include <queue> 
#include <mutex> 
#include <condition_variable> 
#include <memory> 
#include <atomic> 

template <typename T> class SafeQueue 
{ 
public: 
    SafeQueue() : 
    _shutdown(false) 
    { 

    } 

    void Enqueue(T item) 
    { 
     std::unique_lock<std::mutex> lock(_queue_mutex); 
     bool was_empty = _queue.empty(); 
     _queue.push(std::move(item)); 
     lock.unlock(); 

     if (was_empty) 
      _condition_variable.notify_one(); 
    } 

    bool Dequeue(T& item) 
    { 
     std::unique_lock<std::mutex> lock(_queue_mutex); 

     while (!_shutdown && _queue.empty()) 
      _condition_variable.wait(lock); 

     if(!_shutdown) 
     { 
      item = std::move(_queue.front()); 
      _queue.pop(); 

      return true; 
     } 

     return false; 
    } 

    bool IsEmpty() 
    { 
     std::lock_guard<std::mutex> lock(_queue_mutex); 
     return _queue.empty(); 
    } 

    void Shutdown() 
    { 
     _shutdown = true; 
     _condition_variable.notify_all(); 
    } 

private: 
    std::mutex _queue_mutex; 
    std::condition_variable _condition_variable; 
    std::queue<T> _queue; 
    std::atomic<bool> _shutdown; 
}; 

我用它是這樣的:

class Producer 
{ 
public: 
    Producer() : 
     _running(true), 
     _t(std::bind(&Producer::ProduceThread, this)) 
    { } 

    ~Producer() 
    { 
     _running = false; 
     _incoming_packets.Shutdown(); 
     _t.join(); 
    } 

    SafeQueue<Packet> _incoming_packets; 

private: 
    void ProduceThread() 
    { 
     while(_running) 
     { 
      Packet p = GetNewPacket(); 
      _incoming_packets.Enqueue(p); 
     } 
    } 

    std::atomic<bool> _running; 
    std::thread _t; 
} 

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 

    ~Consumer() 
    { 
     _t.join(); 
    } 

private: 
    void WorkerThread() 
    { 
     Packet p; 

     while(producer->_incoming_packets.Dequeue(p)) 
      ProcessPacket(p); 
    } 

    std::thread _t; 
    Producer* _producer; 
} 

這工作的時間。但在同時,一旦當我刪除生產者(並引起它的析構函數調用SafeQueue::Shutdown,在_t.join()塊永遠

我的猜測是,這個問題是這裏(SafeQueue::Dequeue):

while (!_shutdown && _queue.empty()) 
     _condition_variable.wait(lock); 

SafeQueue::Shutdown從線#1被調用,而線#2檢查完_Shutdown但在此之前它執行_condition_variable.wait(lock),因此「缺失」的notify_all()。能這樣呢?

如果這是問題,解決問題的最佳方法是什麼?

+0

覆蓋您是否打開了最高級別的警告?您的消費者類中有一個微妙的錯誤。您的數據成員的順序以及您打算在構造函數衝突中初始化它們的順序....檢查它。該線程將在分配_producer之前創建。 – WhiZTiM

+0

同樣,您錯誤地使用了條件變量......它不應該像這樣處於while循環中。它有一個允許這樣的測試的過載 – WhiZTiM

+0

@WhiZTiM我知道它有這樣的過載,但它相當於一個while循環:http://en.cppreference.com/w/cpp/thread/condition_variable/wait。 – UnTraDe

回答

1

由於SafeQueue對象爲生產者所有,因此當生產者完成時,刪除生產者會導致通知的消費者和被刪除的SafeQueue之間的競爭條件被刪除。

我建議共享資​​源既不是由生產者也不是由消費者擁有,而是作爲引用傳遞給每個構造器。

更改生產者和消費者構造函數;

Producer(SafeQueue<Packet> & queue) : 
    _running(false), _incoming_packets(queue) {} 


Consumer(SafeQueue<Packet> & queue) : 
    _running(false), _incoming_packets(queue) {} 

以這種方式使用您的實例;

SafeQueue<Packet> queue; 
Producer producer(queue); 
Consumer consumer(queue); 

...do stuff... 

queue.shutdown(); 

這也解決了您在Consumer類中與Producer類緊密耦合的糟糕設計問題。

另外,在析構函數中殺死和連接線程可能是個壞主意,就像你爲〜Producer做的那樣。最好爲每個線程類添加一個Shutdown()方法,並明確地調用它們;

producer.shutdown(); 
consumer.shutdown(); 
queue.shutdown(); 

關機順序並不重要,除非你是擔心失去仍在隊列中,當你停止消費未處理的分組。

+0

爲什麼在解構器中加入線程是一個糟糕的設計?它與RAII不衝突嗎? – UnTraDe

+0

@UnTraDe,創建和銷燬將是RAII,啓動和停止是程序控制。但是,join可以拋出異常,這意味着正確的處理需要更多的析構函數中的邏輯,這絕對與RAII衝突。 – CAB

0

在您的SafeQueue::Dequeue中,您可能正在使用std::condition_variable錯誤的方式...更改此:

bool Dequeue(T& item) 
{ 
    std::unique_lock<std::mutex> lock(_queue_mutex); 

    while (!_shutdown && _queue.empty()) 
     _condition_variable.wait(lock); 
    if(!_shutdown) 
    { 
     item = std::move(_queue.front()); 
     _queue.pop(); 
     return true; 
    } 
    return false; 
} 

bool Dequeue(T& item) 
{ 
    std::unique_lock<std::mutex> lock(_queue_mutex); 
    _condition_variable.wait(lock, []{ return _shutdown || !_queue.empty() }); 
    if(!_shutdown) 
    { 
     item = std::move(_queue.front()); 
     _queue.pop(); 
     return true; 
    } 

    return false; 
} 

其次,Consumer數據成員的初始化順序是不正確的有關於它的構造

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 
    ...... 
    // _t will be constructed first, regardless of your constructor initializer list 
    // Meaning, the thread can even start running using an unintialized _producer 
    std::thread _t;  
    Producer* _producer; 
} 

應該重新排序到:

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 
    ...... 
    Producer* _producer; 
    std::thread _t;  
} 

您的問題的另一部分由CAB's answer

+0

原始'while(!pred)condvar.wait(lock);'代碼是正確的,你的改變是錯誤的,因爲當傳遞一個謂詞時,等待謂詞爲真時停止,所以它應該是'_shutdown || !_queue.empty()' – stefaanv

+0

@stefaanv,Oopsie。糾正。非常感謝 – WhiZTiM