2016-02-02 43 views
0

在我目前的應用程序中,我通過光譜儀接收光譜數據。這些數據累計一秒鐘,然後放入循環緩衝區。現在我有一位消費者,他從緩衝區中彈出條目,然後將所有內容保存到磁盤。好吧,所有這些東西的作品。現在我需要做的是添加另一位消費者,與儲蓄並行的是,他們會對光譜進行一些處理。所以我有兩個消費者需要完全相同的數據(注意:他們只讀和不修改)。好吧,但這不起作用,因爲如果其中一個消費者彈出緩衝區的一個條目,它就會消失,所以另一個不會收到它。我想這個問題的最簡單的解決方案是給每個消費者它自己的循環緩衝區。很好,但唯一的問題是:數據條目很大。一個條目的最大大小大約爲80MB,所以爲了節省內存,在兩次沒有相同的數據的情況下將是非常好的。有沒有更好的解決方案?單生產者多消費者循環緩衝器

注:我正在使用循環緩衝區,以確保緩衝區有一個增長的限制。

+1

如果你有每消費一循環緩衝區,然後如果它舉行了'shared_ptr'您的數據這將是最好的。通過這種方式,您可以避免任何副本,並且只要您的一位消費者引用它,您就可以保證您的數據將保持有效。 –

+1

這裏明顯的要求是兩個消費者需要合作,以便他們可以共享指向數據的指針。並且它們大致在同一時間完成,否則將失去併發性。複製數據並不能解決這個問題,因爲這隻會造成一個流水帳漏洞,最終會堆積如山。 –

回答

1

我希望你能直接接收數據到隊列中,而不是將其複製周圍很多....

任何有效的解決方案,以保持數據的一個副本將不得不所有消費者同步所以只有當他們完成了一個條目才能被彈出。

你可以保留你的循環緩衝區。您只需要一個卸妝刪除條目,當讀者完成它。我強烈建議這個卸妝作家的數據。通過這種方式,它將成爲唯一能夠寫入隊列的人,並且可以簡化事情。

消費者可以從消費者那裏獲得消毒劑,告訴消費者他們做了什麼。

消費者可以與卸妝者分享他們的閱讀偏移。您可以在消費者端使用atomic_store,在移除端使用atomic_load。

它應該是這樣的:

struct Consumer { 
    ... 
    long offset = 0; 
    ... 
    Consumer() { 
    q.remover->add(this); 
    } 
    ... 
    void run() { 
    for(;;) { 
     entry& e = q.read(offset); 
     process(e); 
     atomic_store(&offest, offset + e.size()); 
    } 
    } 
}; 

struct Remover { 
    ... 
    long remove_offset = 0; 
    std::list<Consumer*> cons; 
    ... 
    void remove() { 
    // find lowest read point 
    long cons_offset = MAX_LONG; 
    for(auto p : cons) { 
     cons_offset = std::min(cons_offset, atomic_load(&p->offset)); 
    } 
    // remove up to that point 
    while(cons_offset > remove_offset) { 
     entry& e = q.read(remove_offset); 
     remove_offset += e.size(); 
     q.remove(e.size()); 
    } 
    } 
};