2013-07-23 20 views
2

這是使用VS 2010 concurent隊列典型的生產者/消費者模式,問題是,當我運行該程序,內存消耗撞過來1GB然後程序崩潰,可有人請點在這個代碼中解決這個問題?concurrent_queue內存消耗爆炸,然後程序崩潰

#include <iostream> 
#include <fstream> 
#include <string> 
#include <cstdlib> 
#include <ctime> 

#include <boost\shared_ptr.hpp> 
#include <boost\thread.hpp> 
#include <concurrent_queue.h> 



void wait2(int milliseconds) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
} 

class CQueue 
{ 
    Concurrency::concurrent_queue<int> Q; 

    boost::mutex    m; 
    boost::condition_variable cv; 

public: 

    CQueue():QValue(-1) 
    { 
    } 

    int QRead() 
    { 
     while(Q.empty()) 
     { 
      boost::unique_lock<boost::mutex> lk(m); 
      cv.wait(lk); 
     } 

     int res; 
     if(Q.try_pop(res)) 
     { 
      QValue = res; 
      return true; 
     } 
     return false; 
    } 

    void QWrite(int i) 
    { 
     Q.push(i); 
     cv.notify_one(); 
    } 

    int QValue; 
}; 

CQueue myqueue; 

void write() 
{ 
    int i = 0; 
    while(true) 
    { 
     myqueue.QWrite(++i); 
    } 
} 


void read() 
{ 
    while(true) 
    { 
     if(myqueue.QRead()) 
      std::cout << myqueue.QValue << std::endl; 
     else 
      std::cout << "failed to read" << std::endl; 
    } 
} 
void main() 
{ 

    boost::thread w(write); 
    boost::thread r(read); 

    w.join(); 
    r.join(); 

} 
+1

無明顯死鎖。由於讀者正在執行I/O操作,因此作者可能比閱讀器耗盡速度快得多。 – Casey

+0

爲什麼你把'unique_lock'放在一個範圍內,你知道你會在鎖關閉的時候從隊列中讀取,對吧?爲什麼當你有一個concurrent_queue時你在做這些事情? – yngccc

+0

@Casey你應該有權利,因爲通常生產者/消費者模式需要指定每個消費者的容量(閾值在他們的輸入fifo),以阻止生產者,直到有人準備好或有存儲。 – alexbuisson

回答

1

我在一個簡單的雙核上構建並測試了VS'13和Boost 1.52的代碼。如前所述,由於您的生產者 - 消費者設計沒有定義在股票(concurrent_queue)達到給定水平時阻止生產者的閾值,生產者推動隊列中太多的數據,因此內存增加,Windows開始交換,凍結過程,如果超過最大提交大小等崩潰....

注意,提交大小限制取決於幾個因素,編譯器,編譯器選項,OS上的你的程序運行,...

因此,在下面我添加了一個方法來阻止生產者如果隊列大小達到閾值,如果隊列大小低於消費者喚醒生產者的閾值。

有了這些變化,我們增加了一些同步,並且可以限制位並行,但使用中的記憶是在控制之下。

#include <iostream> 
#include <fstream> 
#include <string> 
#include <cstdlib> 
#include <ctime> 

#include "..\..\..\boost\boost\shared_ptr.hpp" 
#include "..\..\..\boost\boost\thread.hpp" 

#include <concurrent_queue.h> 

#define STOCK_THRESHOLD 1000 

void wait2(int milliseconds) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
} 

class CQueue 
{ 
    Concurrency::concurrent_queue<int> Q; 

    boost::mutex    consumerMutex; 
    boost::condition_variable consumerCV; 

    boost::mutex    producerMutex; 
    boost::condition_variable producerCV; 

public: 

    CQueue():QValue(-1) 
    { 
    } 

    int QRead() 
    { 
     while(Q.empty()) 
     { 
      boost::unique_lock<boost::mutex> lk(consumerMutex); 
      consumerCV.wait(lk); 
     } 

     int res; 
     if(Q.try_pop(res)) 
     { 
      QValue = res; 
      if(Q.unsafe_size() <= STOCK_THRESHOLD) 
      { 
       producerCV.notify_one(); 
      } 
      return true; 
     } 
     return false; 
    } 

    void QWrite(int i) 
    { 
     while(Q.unsafe_size() > STOCK_THRESHOLD){ 
      boost::unique_lock<boost::mutex> lk(producerMutex); 
      producerCV.wait_for(lk, boost::chrono::milliseconds(10)); 
     } 
     Q.push(i); 
     consumerCV.notify_one(); 
    } 

    int QValue; 
}; 

CQueue myqueue; 

void write() 
{ 
    int i = 0; 
    while(true) 
    { 
     myqueue.QWrite(++i); 

    } 
} 


void read() 
{ 
    while(true) 
    { 
     if(myqueue.QRead()) 
      std::cout << myqueue.QValue << std::endl; 
     else 
      std::cout << "failed to read" << std::endl; 
    } 
} 

void main() 
{ 

    boost::thread w(write); 
    boost::thread r(read); 

    w.join(); 
    r.join(); 

} 
1

該代碼從條件變量中失去通知,以致您的消費者線程睡眠時間過長,因此速度不夠快。

想象可以想象線程序列:

Producer      Consumer 
--+-----------------------------+------------------------------------------------------- 
1 |        | while(Q.empty()) 
2 | Q.push(i);    | boost::unique_lock<boost::mutex> lk(consumerMutex); 
3 | consumerCV.notify_one(); | 
4 |        | consumerCV.wait(lk); // notification from 3 gets lost 

要解決的互斥體必須在信號consumerCV.notify_one()前的生產狀態,雖然Q.empty()前檢查消費者的隊列的狀態舉行。

您可以方便地查看註釋掉所有互斥和條件變量調用和改變消費者忙等類似while(Q.empty()) /* busy-wait */;

如果concurrent_queue沒有提供一個函數來等待一個項目可用,那麼使用包含在互斥鎖中的非線程安全容器可能會更好。因爲它仍然需要一個互斥體和一個條件變量來正確地通知使用無鎖或免等待容器所獲得的好處將會丟失。另外,因爲生產者只生產++i,但消費者通過打印每個值來做更多的事情,消費者不可能跟上生產者,導致隊列的建立和最終的內存耗盡。