2012-07-11 27 views
0

我目前正在編寫一個應用程序(使用boost),將有一個生產者抓取框架和一個消費者閱讀框架。我在生產者中添加了一個sleep語句來模擬抓取框架的時間。我希望消費者等待一個條件變量,並且在生產者的第一個通知被喚醒時讀取幀。但是,我在日誌文件中看到的是等待條件變量的使用者(主線程),但是在消費者退出等待讀取幀之前,生產者會經歷幾個notifys。Boost線程生產者/消費者意外行爲

這裏是我的Worker.h

class Worker { 
static log4cxx::LoggerPtr m_log; 

public: 
    Worker(); 
    virtual ~Worker(); 

    void start(); 
    void stop(); 
    void getCurrentFrame(/*cv::Mat& frame*/); 

private: 
    void processFrames(); 

    volatile bool m_stopRequested; 

    bool m_bFrameReady; 
    boost::mutex m_mutex; 
    boost::condition_variable condF; 

    boost::shared_ptr<boost::thread> m_thread; 
}; 

Worker.cpp

LoggerPtr Worker::m_log(Logger::getLogger("fdx.Worker")); 

Worker::Worker() { 
    m_bFrameReady = false; 

    LOG4CXX_INFO(m_log, "Worker() c-tor"); 

    m_stopRequested = false; 

} 

Worker::~Worker() { 
    LOG4CXX_INFO(m_log, "Worker() d-tor"); 
} 

void Worker::start() 
{ 
    LOG4CXX_INFO(m_log, "Worker()::start()"); 
    assert(!m_thread); 

    m_thread = boost::shared_ptr<boost::thread>(new boost::thread(&Worker::processFrames, this)); 

    LOG4CXX_WARN(m_log, "Worker()::start() thread[" << m_thread->get_id() << "] started!"); 
} 

void Worker::stop() 
{ 
    LOG4CXX_INFO(m_log, "Worker()::stop()"); 

    if(m_thread != NULL) 
    { 
     LOG4CXX_INFO(m_log, "Worker()::stop() ThrId [" << m_thread->get_id() << "]"); 
     m_stopRequested = true; 
     m_thread->join(); 
    } 
    else 
    { 
     LOG4CXX_WARN(m_log, "Worker()::stop() The thread for this camera was never started."); 
    } 

LOG4CXX_INFO(m_log, "Worker()::stop() thread stopped!"); 
} 

void Worker::processFrames() 
{ 
    LOG4CXX_WARN(m_log, "Worker()::processFrames() Thread[" << boost::this_thread::get_id() << "] starting..."); 

    int rc = 0; 
    std::stringstream ss; 

    while(!this->m_stopRequested) 
    { 
     boost::mutex::scoped_lock lock(m_mutex); 
     LOG4CXX_WARN(m_log, "Worker()::processFrames() Got a Write lock"); 

     m_bFrameReady = true; 
     LOG4CXX_WARN(m_log, "Worker()::processFrames() Frame ready set to true"); 

     boost::this_thread::sleep(boost::posix_time::milliseconds(200)); 

     LOG4CXX_WARN(m_log, "Worker()::processFrames() Write Un-lock"); 

     lock.unlock(); 

     LOG4CXX_WARN(m_log, "Worker()::processFrames() Notify"); 

     condF.notify_one(); 
    } 
} 

void Worker::getCurrentFrame() 
{ 
    boost::mutex::scoped_lock lock(m_mutex); 

    while(!this->m_bFrameReady) 
    { 
     LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() wait for Read lock"); 
     condF.wait(lock); 
    } 

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready; Got a Read lock"); 

    m_bFrameReady = false; 

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready set to false"); 

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Read Un-lock"); 
    lock.unlock(); 

} 

的main.cpp

LoggerPtr logger(Logger::getLogger("TCamApp")); 

int main(int argc, char** argv) 
{ 
int rc = 0; 

char cwDir[FILENAME_MAX]; 

Worker* pWorker = NULL; 

memset(cwDir, 0, sizeof(cwDir)); 
getcwd(cwDir, FILENAME_MAX); 

std::cout << "Current Working Dir[" << cwDir << "]" << endl; 

std::stringstream ss; 
ss << "" << cwDir << "/logs.properties"; 
std::cout << "logs.properties file[" << ss.str() << "]" << endl; 

struct stat st; 
if(!stat(ss.str().c_str(), &st)) 
{ 
    PropertyConfigurator::configure(ss.str()); 
} 
else 
{ 
    BasicConfigurator::configure(); 
} 

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] starting..."); 

pWorker = new Worker(); 
assert(pWorker); 

pWorker->start(); 

for(int i = 0; i < 100; i++) 
{ 
    pWorker->getCurrentFrame(); 

    LOG4CXX_INFO(logger, "Iteration [" << i << "]"); 


    //boost::this_thread::sleep(boost::posix_time::milliseconds(20)); 
} 

pWorker->stop(); 

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] stopping..."); 

return rc; 
} 

下面是我的日誌文件的摘錄:

2012-07-11 15:33:53,943 [0x7f5707bcf780] INFO TCamApp - Application [/home/op/workspace/TestThreads/Debug/TestThreads] starting... 
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN fdx.Worker - Worker()::start() thread[0x15e4c50] started! 
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() wait for Read lock 
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Thread[0x15e4c50] starting... 
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock 
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true 
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock 
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify 
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock 
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true 
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock 
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify 
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock 
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready set to false 
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Read Un-lock 
2012-07-11 15:33:54,346 [0x7f5707bcf780] INFO TCamApp - Iteration [0] 
2012-07-11 15:33:54,346 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() wait for Read lock 
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock 
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true 
2012-07-11 15:33:54,546 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock 
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify 
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock 
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true 
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock 
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify 
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock 
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true 
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock 
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify 
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock 
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true 
2012-07-11 15:33:55,148 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock 
2012-07-11 15:33:55,149 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify 
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock 
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready set to false 
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Read Un-lock 
2012-07-11 15:33:55,149 [0x7f5707bcf780] INFO TCamApp - Iteration [1] 

正如你從日誌中看到的,主線程等待讀取,而另一個線程會在主線程退出wait()之前生成多個notif。

我已經研究了一些,並認爲我已經正確編碼它,但它沒有像我預期的那樣工作。我將不勝感激對解決方案的任何建議。謝謝。

+0

除非你在一個非常特定的平臺上工作,比如x86,並且想限制自己到這樣一個平臺,['volatile'對多線程編程沒有用處](http://stackoverflow.com/a/4558031/87234)。 – GManNickG 2012-07-11 21:53:37

+0

感謝您的信息。我將刪除它。 – vagrant4ever 2012-07-11 22:08:44

+0

那麼,並用其他東西替換它。 :)像'atomic '(或原子旗)。 – GManNickG 2012-07-11 22:12:29

回答

2

這是預料之中的,因爲生產者線程正在睡眠且鎖定互斥鎖。一旦醒來,它會通知消費者並再次鎖定。對於誰能夠鎖定互斥體,不能保證「公平」。

你似乎試圖實現的是一個異步隊列。它通常包含2個條件變量:一個在隊列滿時按住生產者,另一個在隊列爲空時按住消費者。無論生產或消費隊列中的物品需要多長時間,互斥鎖都只能在推/拉操作期間鎖定 - 這應該是非常快的。

您的sleep語句可能只是偏向您的操作系統的調度程序,以給予生產者線程更多的優先級。將睡眠移出關鍵部分,模擬以外的處理推送操作,並且您應該看到消費者線程的響應更快。

在相關說明上,您可以將一個標記對象(即特殊值,如指針隊列上的空指針)推送到隊列中,以讓消費者線程知道他們不得不停下來。

+0

偉大的信息。將睡眠移出關鍵部分,一切都按預期行事。這讓我想起了從視頻設備中檢索幀的代碼,並且在文件句柄上有一個select()調用,超時。我將代碼更改爲僅鎖定緩衝區複製操作,而不是整個幀的檢索。我真的想用一個哨兵對象來終止。再次感謝!! – vagrant4ever 2012-07-13 15:30:42