2010-01-16 44 views
10

我的應用程序中有兩個線程共享的std::list<Info> infoList。這些2個線程如下訪問此列表:實時應用程序中多線程之間同步容器訪問的最佳方式

線程1:列表(根據情況) 線程2上使用push_back()pop_front()clear():使用一個iterator穿過的項目來迭代列出並採取一些行動。

線程2是迭代的列表如下所示:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i) 
{ 
    DoAction(i); 
} 

的代碼是使用GCC 4.4.2編譯。

有時++我導致段錯誤並崩潰應用程序。該錯誤是由std_list.h行143在下面行:

_M_node = _M_node->_M_next; 

我想這是一個競速條件。當線程2迭代它時,該列表可能已被線程1更改或清除。

我使用Mutex來同步對這個列表的訪問,並且在我的初始測試過程中一切正常。但系統在壓力測試下凍結,使得這個解決方案完全無法接受。這個應用程序是一個實時應用程序,我需要找到一個解決方案,這樣兩個線程可以儘可能快地運行,而不會影響總應用程序的吞吐量。

我的問題是這樣的: 線程1和線程2需要儘可能快地執行,因爲這是一個實時應用程序。我能做些什麼來防止這個問題,並仍然保持應用程序的性能?有沒有可用於這種問題的無鎖算法?

它確定如果我錯過了線程2迭代中新增的Info對象,但是我能做些什麼來防止迭代器成爲懸掛指針?

感謝

+1

Sutters關於編寫無鎖隊列的文章可能有所幫助:http://www.ddj.com/cpp/210604448 –

+0

http://stackoverflow.com/questions/1724630/how-do-i-build- a-lockless-queue –

+0

爲什麼你不能使用隊列而不是列表?您的消費者線程如何通知新的Info對象? –

回答

4

通常這樣使用STL容器是不安全的。你將不得不實現特定的方法來使你的代碼線程安全。您選擇的解決方案取決於您的需求。我可能會通過維護兩個列表來解決這個問題,每個線程都有一個。並通過lock free queue(在這個問題的評論中提到)溝通變化。您還可以通過將它們包裝在boost :: shared_ptr中來限制Info對象的生命週期,例如

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList; 

enum CommandValue 
{ 
    Insert, 
    Delete 
} 

struct Command 
{ 
    CommandValue operation; 
    InfoReference reference; 
} 

typedef LockFreeQueue<Command> CommandQueue; 

class Thread1 
{ 
    Thread1(CommandQueue queue) : m_commands(queue) {} 
    void run() 
    { 
     while (!finished) 
     { 
      //Process Items and use 
      // deleteInfo() or addInfo() 
     }; 

    } 

    void deleteInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Delete; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 

    void addInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Insert; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 
} 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 

class Thread2 
{ 
    Thread2(CommandQueue queue) : m_commands(queue) {} 

    void run() 
    { 
     while(!finished) 
     { 
      processQueue(); 
      processList(); 
     } 
    } 

    void processQueue() 
    { 
     Command command; 
     while (m_commands.consume(command)) 
     { 
      switch(command.operation) 
      { 
       case Insert: 
        m_infoList.push_back(command.reference); 
        break; 
       case Delete: 
        m_infoList.remove(command.reference); 
        break; 
      } 
     } 
    } 

    void processList() 
    { 
     // Iterate over m_infoList 
    } 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 


void main() 
{ 
CommandQueue commands; 

Thread1 thread1(commands); 
Thread2 thread2(commands); 

thread1.start(); 
thread2.start(); 

waitforTermination(); 

} 

這還沒有被編譯。您仍然需要確保訪問您的Info對象是線程安全的。

1

爲了防止你的迭代器被無效你必須鎖定整個for循環。現在我猜想第一個線程可能在更新列表時遇到困難。我會嘗試給它一個機會在每個(或每N次迭代)完成它的工作。

在僞代碼看起來像:

mutex_lock(); 
for(...){ 
    doAction(); 
    mutex_unlock(); 
    thread_yield(); // give first thread a chance 
    mutex_lock(); 
    if(iterator_invalidated_flag) // set by first thread 
    reset_iterator(); 
} 
mutex_unlock(); 
+0

我是否需要每次在reset_iterator()時將迭代器重置到列表的開始位置; ? –

+0

@O。 Askari:取決於操作:'clear' - 當然是,'push_back' - 不需要重置。至於'pop_front' - 你可能不得不重置(例如當你在列表的開頭),在這種情況下,如果你想避免不必要的重置,你需要在線程之間共享迭代器。 – catwalk

-1

我不認爲你可以逃脫沒有任何同步在所有在這種情況下,某些操作無效您正在使用的迭代器。有了一個列表,這是相當有限的(基本上,如果兩個線程都試圖同時操作迭代器到同一個元素),但仍然存在一個危險,那就是在嘗試的同時刪除一個元素追加一個。

你是否有機會鎖住DoAction(i)?您顯然只想鎖定時間的絕對最小值,以最大限度地提高性能。從上面的代碼中,我想你會想要分解一些循環,以加快操作的兩個方面。

東西線沿線的:

while (processItems) { 
    Info item; 
    lock(mutex); 
    if (!infoList.empty()) { 
    item = infoList.front(); 
    infoList.pop_front(); 
    } 
    unlock(mutex); 
    DoAction(item); 
    delayALittle(); 
} 

而且插入功能會還是得看是這樣的:

lock(mutex); 
infoList.push_back(item); 
unlock(mutex); 

除非隊列可能是巨大的,我會忍不住使用類似std::vector<Info>或甚至std::vector<boost::shared_ptr<Info> >的東西來儘可能減少Info對象的複製(假設它們比boost :: shared_ptr複製的代價要高一些,一般來說,矢量上的操作往往比一個列表,特別是如果存儲在向量中的對象小而且便宜以便複製。

+0

謝謝,這個解決方案看起來很有前途。該列表不包含許多項目,最多20個,每個Info項目佔用大約128個字節。但是,如果我需要在infoList中保留處理的項目而不刪除它們呢? –

1

你必須決定哪個線程更重要。如果它是更新線程,那麼它必須指示迭代器線程停止,等待並重新開始。如果它是迭代器線程,它可以簡單地鎖定列表直到迭代完成。

5

您的for()循環可能會在相當長的時間內保持鎖定,具體取決於它迭代的元素數量。如果它「輪詢」隊列,不斷檢查是否有新元素可用,你可能會遇到麻煩。這使得線程擁有互斥時間不合理的長時間,使得生產者線程很少有機會插入並添加一個元素。並在此過程中焚燒大量不必要的CPU週期。

您需要一個「有界阻塞隊列」。不要自己寫,鎖設計不是微不足道的。很難找到很好的例子,其中大部分是.NET代碼。 This article看起來很有希望。

0

您必須使用一些線程庫。如果您使用的是英特爾TBB,則可以使用concurrent_vector或concurrent_queue。請參閱this

3

我想知道這個清單的目的是什麼,那麼回答這個問題會比較容易。正如Hoare所說,嘗試共享數據以在兩個線程之間進行通信通常是一個糟糕的主意,而您應該通信以共享數據:即消息傳遞。

例如,如果此列表正在對隊列建模,則可以簡單地使用兩種線程之間的各種通信方式之一(例如套接字)。消費者/生產者是一個標準和衆所周知的問題。

如果您的物品很貴,那麼只有在通信過程中傳遞指針,才能避免複製物品本身。

一般來說,共享數據是非常困難的,儘管它不幸是我們在學校聽到的唯一編程方式。通常,只有低層次的溝通「渠道」實施時應該擔心同步,你應該學會使用渠道進行交流,而不是試圖模仿他們。

+0

+1 - 僅僅爲霍爾報價,如果沒有別的。多線程編程並不是所有問題的唯一答案。 –

0

如果您想在多線程環境中繼續使用std::list,我建議將其封裝在帶互斥鎖的類中,該鎖提供對其的鎖定訪問。根據確切的用法,切換到事件驅動的隊列模型可能是有意義的,在該模型中,消息在多個工作線程正在消耗的隊列上傳遞(提示:生產者 - 消費者)。

我會認真考慮Matthieu's thought。使用多線程編程正在解決的許多問題可以通過線程或進程之間的消息傳遞更好地解決。如果您需要高吞吐量並且絕對不要求處理器共享相同的內存空間,請考慮使用諸如Message-Passing Interface (MPI)之類的東西,而不是滾動您自己的多線程解決方案。有可用的一堆C++實現的 - OpenMPIBoost.MPIMicrosoft MPI,等等,等等

1

正如你提到的,如果你的迭代消費者錯過一些新添加的條目,你不在乎,你可以使用一個禁止複製在寫下列表。這允許迭代使用者在列表的一致快照上操作,直到它首次啓動,而在其他線程中,對列表的更新產生新鮮但一致的副本,而不會干擾任何現存的快照。

這裏的交易是每次更新列表需要鎖定獨佔訪問足夠長的時間來複制整個列表。這種技術偏向於擁有許多併發讀取器和更少的頻繁更新。

嘗試向容器添加內部鎖定首先需要考慮哪些操作需要在原子組中運行。例如,在試圖彈出第一個元素之前檢查列表是否爲空,需要一個原子彈出如果不是空的操作;否則,當呼叫者收到答案並嘗試對其做出反應時,列表中的答案可能會在兩者之間發生變化。

上面的例子中不清楚什麼保證迭代必須遵守。迭代線程必須訪問列表最終中的每個元素嗎?它會多次傳球嗎?當另一個線程運行DoAction()時,一個線程從列表中刪除元素意味着什麼?我懷疑解決這些問題會導致重大的設計變更。


你在C++中的工作,和你提到需要用彈出如果 - 不空操作的隊列中。我多年前使用ACE Library的併發原語編寫了一個two-lock queue,因爲Boost thread library尚未準備好用於生產使用,並且包含這些工具的C++標準庫的機會是遙遠的夢想。將它移植到更現代的東西上會很容易。

我的這個隊列叫做concurrent::two_lock_queue,它只允許通過RAII訪問隊列頭部。這確保了獲取讀取頭的鎖將始終與鎖的釋放配合。消費者構造一個對頭元素的const訪問,一個front(頭元素的非const訪問)或者一個renewable_front(對頭元素和後繼元素的非const訪問)對象,以表示訪問頭元素的專有權的隊列。這樣的「前」對象不能被複制。

two_lock_queue還提供了一個pop_front()功能等待至少一個元素可以被刪除,但與std::queue保持的和std::stack「不混合容器突變和值複製,pop_front()返回void的風格。

在一個伴隨文件中,有一種叫做concurrent::unconditional_pop的類型,它允許通過RAII確保從當前範圍退出時隊列的頭元素將被彈出。

配套文件error.hh定義了使用函數two_lock_queue::interrupt()產生的異常,用於取消阻塞等待訪問隊列頭部的線程。

看看代碼,讓我知道如果你需要更多的解釋如何使用它。

+0

列表中的每個元素都需要由DoAction()處理,但另一個線程可能會在執行過程中刪除一個元素。這不會造成任何傷害,因爲它可以檢查Info對象的有效性。循環之後,該列表不能清空,因爲它將在其他地方使用。我爲迭代器使用了一個快照,但是我仍然遇到了如何做一個原子彈如果不是空的問題。謝謝 –

1

執行此操作的最佳方法是使用內部同步的容器。 TBB和微軟的concurrent_queue這樣做。安東尼·威廉姆斯也有在他的博客here

1

其他人已經提出無鎖替代一個良好的執行,所以,如果你是使用鎖卡住...

當您修改名單我來回答現有的迭代器可能會失效,因爲它們不再指向有效的內存(該列表在需要增長時會自動重新分配更多內存)。爲了防止失效的迭代器,你的可能當你的消費者遍歷列表時使得互斥體上的生產者塊,但是對於生產者,這將是不需要等待

如果您使用隊列而不是列表,並讓您的客戶使用同步的queue<Info>::pop_front()調用,而不是在後面可能失效的迭代器,生活會更容易。如果您的消費者確實需要一次吞下Info的大塊,那麼請使用condition variable,這會讓您的消費者阻止,直到queue.size() >= minimum

Boost庫有一個很好的條件變量的可移植實現(即使適用於舊版本的Windows),以及usual threading library stuff

對於使用(老式)鎖定的生產者 - 消費者隊列,請檢查ZThreads庫的BlockingQueue模板類。我自己並沒有使用ZThreads,擔心缺乏最新的更新,並且因爲它似乎沒有被廣泛使用。然而,我已經用它作爲我自己線程安全的生產者 - 消費者隊列的靈感(在我學習了lock-free隊列和TBB之前)。

一個無鎖隊列/堆棧庫似乎在Boost查看隊列中。希望我們在不久的將來看到新的Boost.Lockfree! :)

如果有興趣,我可以寫一個阻塞隊列的例子,它使用std :: queue和Boost線程鎖定。

編輯

由Rick的回答中引用的博客已經有一個使用的std ::隊列和升壓條件變量阻塞隊列的例子。如果消費者需要狼吞虎嚥塊,你可以按如下擴展示例:

void wait_for_data(size_t how_many) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     while(the_queue.size() < how_many) 
     { 
      the_condition_variable.wait(lock); 
     } 
    } 

您可能還需要調整它允許超時和取消。

您提到速度是一個問題。如果你的Info是重量級的,你應該考慮通過shared_ptr。您也可以嘗試使您的Info的固定大小,並使用memory pool(它可以比堆快得多)。

1

如果您使用的C++ 0x,你可以在內部反覆這樣同步列表:

假設類有一個名爲objects_一個模板列表,以及一個boost ::互斥mutex_命名

該方法toAll是列表包裝紙的成員方法

void toAll(std::function<void (T*)> lambda) 
{ 
boost::mutex::scoped_lock(this->mutex_); 
for(auto it = this->objects_.begin(); it != this->objects_.end(); it++) 
{ 
     T* object = it->second; 
     if(object != nullptr) 
     { 
       lambda(object); 
      } 
     } 
} 

通話:

synchronizedList1->toAll(
     [&](T* object)->void // Or the class that your list holds 
     { 
      for(auto it = this->knownEntities->begin(); it != this->knownEntities->end(); it++) 
      { 
       // Do something 
      } 
     } 
); 
相關問題