2012-01-30 19 views
0

我想使用boost線程庫實現一個帶有條件變量的同步隊列,就像這裏的示例 - >(ImplementingThreadSafeQueue)。boost在C++條件變量中的同步隊列不會通知其他線程上的等待類方法

背景/目的:我正在撰寫windows服務作爲高級設計項目的一部分。在整個服務中,我希望有各種級別的日誌記錄(包括文件和Windows事件查看器),還使用我自己的「CreateTimerQueueTimer」函數的「EventTimer」包裝來創建定時事件,例如服務報告心跳。我的想法是將消息對象推送到同步隊列中,並讓記錄類在其自己的線程上觀察隊列,等待執行各種日誌記錄任務。爲了簡單起見,我現在只用字符串進行測試。

問題:記錄器線程在屬於記錄類的方法上運行,以從隊列中獲取工作項。如果我從類外部將東西推到隊列上,可以從EventTimer線程或甚至主線程中說出,記錄器永遠不會收到隊列中新項目的通知。但是,如果我創建屬於記錄器類的兩個線程並使用其中一個線程將某些內容推送到隊列中,則記錄器將看到它並作出響應。我希望有任何線程能夠添加東西到隊列中,並通知記錄器新項目。

我的代碼如下。任何幫助,將不勝感激。感謝您的時間!

同步隊列代碼

#ifndef _SYNCHRONIZED_QUEUE_ 
#define _SYNCHRONIZED_QUEUE_ 

// Include Files 
#include <boost\noncopyable.hpp> 
#include <boost\thread.hpp> 
#include <queue> 

namespace GSMV 
{ 

/////////////////////////////////////////////////////////////////////////////////////// 
/// Class: SynchronizedQueue 
/// 
/// @Brief 
/// SynchronizedQueue is a thread safe STL Queue wrapper that waits on Dequeue and 
/// notifies a listening thread on Enqueue. It is not copyable. 
/////////////////////////////////////////////////////////////////////////////////////// 
template <typename T> 
class SynchronizedQueue : private boost::noncopyable 
{ 
    public: 
    struct Canceled{}; 

    /////////////////////////////////////////////////////////////////////////////////////// 
    /// Function: Constructor 
    /// 
    /// @Brief 
    /// Default constructor for the SynchronizedQueue object. 
    /////////////////////////////////////////////////////////////////////////////////////// 
    SynchronizedQueue(void) 
    { 
     // Queue is not canceled to start with 
     this->mCanceled = false;  

     // Nobody waiting yet 
     this->mWaiting = 0; 
    } 

    /////////////////////////////////////////////////////////////////////////////////////// 
    /// Function: Enqueue 
    /// 
    /// @Param const T &item: Item of type T to add to queue. 
    /// 
    /// @Brief 
    /// Adds an item of type T to the queue notifying via a condition. 
    /////////////////////////////////////////////////////////////////////////////////////// 
     void Enqueue(const T &item) 
     { 
     bool enqueued = false; 

     // acquire lock on the queue 
     boost::unique_lock<boost::mutex> lock(this->mMutex); 

     // make sure the queue is not canceled 
     if (this->mCanceled) 
     throw Canceled(); 

     // add item to the queue 
      this->mQueue.push(item); 

     // notify others that queue has a new item 
     this->mItemAvailable.notify_one(); 
     } 

    /////////////////////////////////////////////////////////////////////////////////////// 
    /// Function: Dequeue 
    /// 
    /// @Return 
    /// Item of type T from front of queue. 
    /// 
    /// @Brief 
    /// Returns an item of type T from the queue and deletes the front of the queue. Thread 
    /// will wait on an empty queue until it is signaled via Enqueue. 
    /////////////////////////////////////////////////////////////////////////////////////// 
     T Dequeue(void) 
     { 
     // acquire lock on the queue 
     boost::unique_lock<boost::mutex> lock(this->mMutex); 

     // make sure the queue is not canceled 
     if (this->mCanceled) 
     throw Canceled(); 

     // one more thread is waiting on this item 
     ++this->mWaiting; 

     // if the queue is empty, wait until an item is added 
     // lock is released inside the wait 
     // lock is re-acquired after the wait 
      while (this->mQueue.empty()) 
       this->mItemAvailable.wait(lock); 

     // the thread is done waiting now 
     --this->mWaiting; 

     // retrieve and remove the item from the queue 
      T item = this->mQueue.front(); 
      this->mQueue.pop(); 

      return item; 
     // lock is released 
     } 

    /////////////////////////////////////////////////////////////////////////////////////// 
    /// Function: GetSize 
    /// 
    /// @Return 
    /// The current size of the queue (number of items in the queue). 
    /// 
    /// @Brief 
    /// Returns the number of items contained in the queue. 
    /////////////////////////////////////////////////////////////////////////////////////// 
     int GetSize(void) 
     { 
     // acquire lock on the queue 
     boost::unique_lock<boost::mutex> lock(this->mMutex); 

     // make sure the queue is not canceled 
     if (this->mCanceled) 
     throw Canceled(); 

      return this->mQueue.size(); 
     // lock is released 
     } 

    /////////////////////////////////////////////////////////////////////////////////////// 
    /// Function: IsEmpty 
    /// 
    /// @Return 
    /// Boolean queue is empty. 
    /// 
    /// @Brief 
    /// Returns true if queue is empty false otherwise. 
    /////////////////////////////////////////////////////////////////////////////////////// 
     bool IsEmpty(void) 
     { 
     // acquire lock on the queue 
     boost::unique_lock<boost::mutex> lock(this->mMutex); 

     // make sure the queue is not canceled 
     if (this->mCanceled) 
     throw Canceled(); 

      return this->mQueue.empty(); 
     // lock is released 
     } 

    void Cancel(void) 
    { 
     // acquire lock on the queue 
     boost::unique_lock<boost::mutex> lock(this->mMutex); 

     // make sure the queue is not canceled 
     if (this->mCanceled) 
     throw Canceled(); 

     this->mCanceled = true; 

     // notify all others that queue has a new item 
     this->mItemAvailable.notify_all(); 

     while (0 < this->mWaiting) 
     this->mItemAvailable.wait(lock); 
    } 

    void Reset(void) 
    { 
     // acquire lock on the queue 
     boost::unique_lock<boost::mutex> lock(this->mMutex); 

     // reset the canceled arguement 
     this->mCanceled = false; 
    } 

    private: 
    bool mCanceled; 
    int mWaiting; 
     std::queue<T> mQueue; // the STL Queue 
     boost::mutex mMutex; // the mutex object 
     boost::condition_variable mItemAvailable; // the signal condition 
}; 

} // Namespace GSMV 


#endif /// _SYNCHRONIZED_QUEUE_ 

記錄器代碼

#ifndef _LOGGER_H_ 
#define _LOGGER_H_ 

#include "SynchronizedQueue.h" 
#include <string> 
#include <boost\thread.hpp> 

namespace GSMV 
{ 

static SynchronizedQueue<std::string> logQ; 

class Logger 
{ 
    public: 
    Logger(void); 
    ~Logger(void); 

    bool Start(void); 
    bool Stop(void); 
    bool IsRunning(void) const; 
    void LoggerWorkThread(void); 

    private: 
    boost::thread* mpLoggerThread; 
}; 

} // Namespace GSMV 

#endif 
// FILE END - logger.h // 



#include "Logger.h" 

using namespace GSMV; 

Logger::Logger(void) 
{ 
    this->mpLoggerThread = NULL; 
} 

Logger::~Logger(void) 
{ 
    this->Stop(); 
} 

bool Logger::Start(void) 
{ 
    bool started = this->IsRunning(); 

    if (!started) 
    { 
    this->mpLoggerThread = new boost::thread(&Logger::LoggerWorkThread, this); 
    started = (NULL != this->mpLoggerThread); 
    } 

    return started; 
} 

bool Logger::Stop(void) 
{ 
    bool stopped = !this->IsRunning(); 

    if (!stopped) 
    { 
    this->mpLoggerThread->interrupt(); 
    this->mpLoggerThread->join(); 

    delete this->mpLoggerThread; 
    this->mpLoggerThread = NULL; 

    stopped = true; 
    } 

    return stopped; 
} 

bool Logger::IsRunning(void) const 
{ 
    return (NULL != this->mpLoggerThread); 
} 

void Logger::LoggerWorkThread(void) 
{ 
    std::cout << "Enter Logger Work Thread\n" << std::endl; 

    while (this->IsRunning()) 
    { 
    std::cout << "LOG: wait for Q..." << std::endl; 
    std::string s = logQ.Dequeue(); 
    std::cout << "LOG: Got item! => " << s << std::endl; 

    boost::this_thread::interruption_point(); 
    } 

    std::cout << "Exit Logger Work Thread\n" << std::endl; 
} 

因此,使用上面的代碼我將創建一個記錄器對象,並調用Start()方法。理想情況下,它會啓動一個循環的新線程,檢查隊列中的字符串項,直到調用Stop()方法。所以回到我的主函數中,我可以將字符串推入隊列,記錄器應該得到它們,但記錄器永遠不會收到通知。如果這很重要,隊列在Logger頭文件中聲明爲「static SynchronizedQueue logQ」。 再次,我希望這裏有任何建議。謝謝!

回答

2

在調用條件變量notify_onenotify_all之前,您必須解鎖互斥鎖。

+0

嗯,你是對的!我用「lock.unlock()」解鎖了互斥鎖。然而,我仍然遇到同樣的行爲...... – akagixxer 2012-01-30 19:02:36

+0

你應該重新設計你的隊列:第一眼,我想知道在線程退出時或錯誤情況下應該返回什麼'Dequeue()? std :: queue's'pop()'不返回值是有很好的理由的!那麼:重新設計後,你的錯誤可能會變得清晰。 – Frunsi 2012-01-31 22:44:09

+0

那麼..你應該重新讀你的示例源隊列實現(http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html) - 它包含答案。另外:你添加一個'mCanceled'標誌似乎並不適合隊列本身。 – Frunsi 2012-01-31 22:50:41