2010-05-28 33 views
4

我有一個情況,許多線程都同時生成最終寫入一個長串流文件流的數據。我需要以某種方式將這些寫入序列化,以便將流寫入正確的順序。線程I/O重排序緩衝區的標準術語?

,我的2048 J任務 ..j Ñ,其每一個產生的數據塊ö輸入隊列。作業在8個線程上並行運行,但輸出塊必須以與相應輸入塊相同的順序出現在輸出塊中,輸出文件必須按以下順序排列:o o o o ...

的解決方案,這是非常不言自明:我需要在Tomasulo's algorithm某種緩衝區積累和正確的順序寫入輸出塊,類似於CPU重新排序緩衝器,或到路TCP在將它們傳遞到應用層之前重新組裝無序數據包。

在我開始編寫代碼之前,我想快速搜索文獻,看看是否有任何文章以特別聰明或有效的方式解決了這個問題,因爲我有嚴重的實時和內存限制。我似乎無法找到任何描述這一點的論文;在[線程,併發,重新排序緩衝區,重新組裝,io,序列化]的每個排列上的學者搜索沒有取得任何有用的結果。我覺得我不應該在尋找正確的術語。

是否有這種模式的常用學術名稱或關鍵字,我可以搜索?

回答

0

其實,你不應該需要積累大塊。大多數操作系統和語言提供了一種隨機訪問文件抽象,允許每個線程獨立將其輸出數據寫入文件中的正確位置,而不影響來自任何其他線程的輸出數據。

或者你是否正在寫入真正的串行輸出文件,如套接字?

+0

真正串行 - 流密碼。 – Crashworks 2010-05-28 22:14:58

+0

只有在處理完成之前輸出記錄的長度已知,您的解決方案才能正常工作。 – 2010-05-30 17:34:22

0

我個人不會使用可重新配置的緩衝區。我會爲每個作業創建一個「作業」對象,並根據您的環境使用消息傳遞或互斥來按順序接收來自每個作業的完整數據。如果下一份工作沒有完成,那麼'作家'流程會一直等到它完成。

+0

恐怕我沒有按照你的意思去做。你的意思是我應該有多少個互斥體,每個作業都有一個互斥體,並且作者應該按升序等待每個互斥體?這樣做的麻煩在於,我一次只能擁有大約20個作業的內存,並且如果遇到當前窗口恰好以相反順序完成的情況,那麼會留下幾條線程,直到「頭」一個完成。 – Crashworks 2010-05-30 23:22:10

+0

這就是我所建議的,是的。如果任務以相反的順序完成(除了史蒂夫的建議,如果您的記錄已知),或者將完整的結果緩存到磁盤,我認爲任何其他解決方案都不會更好。 – 2010-05-31 17:31:02

0

我會使用與您使用的線程數量相同長度的環形緩衝區。環緩衝區也會有相同數量的互斥量。

rinbuffer還必須知道它寫入文件的最後一個塊的id。它相當於你的環形緩衝區的0索引。

在添加到環緩衝區時,您檢查是否可以寫入,即設置索引0,然後可以一次向該文件寫入多個塊。

如果未設置索引0,只需鎖定當前線程以等待。 - 你也可以有一個比緩衝區長2-3倍的緩衝區,並且只在適當的時候鎖定,例如:當足夠的作業完成緩衝區已經啓動時。

不要忘了更新的最後一塊寫堅韌;)

書面方式將文件時,您還可以使用雙緩衝。

0

讓輸出隊列包含期貨而不是實際數據。當您從輸入隊列中檢索一個項目時,立即將相應的未來發布到輸出隊列中(注意確保這會保留順序---見下文)。當工作者線程處理完項目後,可以設置未來的值。輸出線程可以從隊列中讀取每個將來,並阻塞,直到未來準備就緒。如果後期準備工作提前做好準備,只要期貨有序,這根本不會影響產出線。

有兩種方法可確保輸出隊列上的期貨按正確的順序排列。第一種方法是使用單個互斥鎖從輸入隊列讀取數據並寫入輸出隊列。每個線程都會鎖定互斥鎖,從輸入隊列獲取一個項目,將未來發布到輸出隊列並釋放互斥鎖。

第二個是有一個主線程從輸入隊列中讀取,在輸出隊列中發佈未來,然後將項目交給工作線程執行。

在C++中有一個互斥體保護的隊列,這將是這樣的:

#include <thread> 
#include <mutex> 
#include <future> 

struct work_data{}; 
struct result_data{}; 

std::mutex queue_mutex; 
std::queue<work_data> input_queue; 
std::queue<std::future<result_data> > output_queue; 

result_data process(work_data const&); // do the actual work 

void worker_thread() 
{ 
    for(;;) // substitute an appropriate termination condition 
    { 
     std::promise<result_data> p; 
     work_data data; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(input_queue.empty()) 
      { 
       continue; 
      } 
      data=input_queue.front(); 
      input_queue.pop(); 
      std::promise<result_data> item_promise; 
      output_queue.push(item_promise.get_future()); 
      p=std::move(item_promise); 
     } 
     p.set_value(process(data)); 
    } 
} 

void write(result_data const&); // write the result to the output stream 

void output_thread() 
{ 
    for(;;) // or whatever termination condition 
    { 
     std::future<result_data> f; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(output_queue.empty()) 
      { 
       continue; 
      } 
      f=std::move(output_queue.front()); 
      output_queue.pop(); 
     } 
     write(f.get()); 
    } 
}