2012-08-04 66 views
6

我負責修改同步C程序,以便它可以並行運行。目標是讓它儘可能便攜,因爲它是許多人使用的開源程序。正因爲如此,我認爲最好將程序包裝在C++層中,以便我可以利用便攜式boost庫。我已經這樣做了,一切似乎都按預期工作。多線程C++消息傳遞

我遇到的問題是決定什麼是在線程之間傳遞消息的最佳方法。幸運的是,該計劃的架構是多生產者和單一消費者的架構。更好的是,消息的順序並不重要。我已經讀過單生產者/單消費者(SPSC)隊列將受益於這種架構。那些有多線程編程經驗的人有什麼建議?我對這個東西很陌生。另外任何使用boost來實現SPSC的代碼示例都將不勝感激。

+0

查看接受答案http://stackoverflow.com/questions/8918401/does-a-multiple-producer-single-consumer-lock-free-queue-exist-for-c – walrii 2012-08-04 01:47:54

回答

7

以下是我用於合作性多任務/多線程庫(MACE)http://bytemaster.github.com/mace/的技術。除了隊列爲空時外,它具有無鎖的好處。

struct task { 
    boost::function<void()> func; 
    task* next; 
}; 


boost::mutex      task_ready_mutex; 
boost::condition_variable  task_ready; 
boost::atomic<task*>    task_in_queue; 

// this can be called from any thread 
void thread::post_task(task* t) { 
    // atomically post the task to the queue. 
    task* stale_head = task_in_queue.load(boost::memory_order_relaxed); 
    do { t->next = stale_head; 
    } while(!task_in_queue.compare_exchange_weak(stale_head, t, boost::memory_order_release)); 

    // Because only one thread can post the 'first task', only that thread will attempt 
    // to aquire the lock and therefore there should be no contention on this lock except 
    // when *this thread is about to block on a wait condition. 
    if(!stale_head) { 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex); 
     task_ready.notify_one(); 
    } 
} 

// this is the consumer thread. 
void process_tasks() { 
    while(!done) { 
    // this will atomically pop everything that has been posted so far. 
    pending = task_in_queue.exchange(0,boost::memory_order_consume); 
    // pending is a linked list in 'reverse post order', so process them 
    // from tail to head if you want to maintain order. 

    if(!pending) { // lock scope 
     boost::unique_lock<boost::mutex> lock(task_ready_mutex);     
     // check one last time while holding the lock before blocking. 
     if(!task_in_queue) task_ready.wait(lock); 
    } 
} 
+0

'合作':(( – 2012-08-04 07:37:31

+0

..雖然+1在每條消息中使用鏈接來避免隊列中的消息存儲 – 2012-08-04 07:52:57

+0

非常感謝。 – grouma 2012-08-04 16:55:50

1

如果只有一個消費者但是有多個生產者,那麼我會使用一個數組或者一些具有O(1)訪問時間的數組結構的數據結構,其中每個數組槽都代表一個生產者消費者隊列。單生產者消費者隊列的巨大優勢在於,您可以在沒有任何顯式同步機制的情況下使其無鎖,從而使其成爲多線程環境中非常快速的數據結構。請參閱my answer here瞭解單生產者消費者隊列的基本實現。

+0

我採用這種技術之前採用原子解決方案我遇到的問題是它沒有擴展,它在緩衝區'空閒'時消耗內存,如果你不知道什麼線程可能會提前發佈(通用),那麼你必須動態調整大小(通過鎖定)或硬編碼「最大」最大線程數。 – bytemaster 2012-08-04 02:34:05

+0

我正在考慮使用循環隊列......這樣就不需要重新調整隊列的大小。 – Jason 2012-08-04 04:17:20

+0

這是可以填滿的'固定大小隊列',但是如果你有N個線程,那麼每個線程或者需要一個輸入用於所有其他N個線程,或者第一次新線程試圖與另一個線程通信時,它必須分配它自己的單個生產者/單個消費者隊列。這是調整大小與硬代碼問題。 – bytemaster 2012-08-04 04:21:29

1

在網上有很多生產者 - 消費者隊列的例子,對多個生產者/消費者是安全的。 @bytemaster發佈了一個在每個消息中使用鏈接來消除隊列類本身的存儲 - 這是一個很好的方法,我自己在嵌入式作業中使用它。

在隊列類必須提供存儲的地方,我通常會使用大小爲N的「池隊列」,並在啓動時加載N *個消息類實例。需要通信的線程必須從池中彈出*消息,將其加載並將其排隊。當最終「用完」時,*消息被推回到池中。這限制了消息的數量,因此所有隊列只需要長度爲N - 不需要調整大小,不需要新建(),不需要刪除(),便於泄漏檢測。

+0

我簡化了答案的代碼,但實際上每個'任務'都存儲'未知大小'的函子(以避免boost :: function <>的堆分配)。此外,我把這個任務加倍作爲一個參考數據,只要未來持有它就保持活力。我最終每個任務有1個malloc,並且每秒可以推送240萬個sync post/wait操作。 – bytemaster 2012-08-04 14:46:59