2012-04-16 17 views
0

保留輸入順序我有一個子系統,看起來像這樣:並行化的任務,但在輸出

 [read]   [decode]  [deliver] 
Byte  --> Undecoded --> Decoded --> Output queue 
stream   message   message  

輸入是一個插座/字節流。第一步是閱讀消息。下一步是解碼消息(並將結果存儲在消息對象中)。最後一步是傳遞信息。

我想並行化解碼步驟,但我必須保持輸出順序與輸入順序相同。所以如果接收到消息A和B並且消息B的解碼更快,我必須等到A完成傳送它。

我在Java中做了一個幼稚的初始實現,但我的分析表明我在切換步驟(從「流式閱讀器」到「解碼器」和從「解碼器」到輸出)中失去了太多。當一個24芯計算機上運行的測試程序(包括超線程)我得到:

  • 1100ķ味精/秒運行一個線程執行的時候。
  • 當運行一個簡單的12線程實現(有很多 隊列)時,會產生110 K msg/s。

我幼稚的做法可在http://pastebin.com/be1JqZy3這是超過200行的代碼,因此它可能會唯一感興趣的那些誰真的想知道它是如何可能使一個並行版本比串行(提示慢10倍:開始看看類ThreadPoolDecoder)。

有沒有人有執行此類問題時使用的模式/框架,其中工作是連續的(基於流)可以並行化但必須按輸出進行排序?

+0

我看到你手動創建你的線程。你有沒有看過使用ThreadPoolExecutor類? http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html – 2012-04-16 14:46:56

回答

2

我在我編寫的程序(用C#編寫)中處理這個問題的方法是在輸出上有一個優先級隊列。每個記錄都有一個關聯的記錄號,它在讀取時被分配。這些數字從0開始並增加。當一個線程完成一條記錄的處理時,它會將該記錄添加到優先級隊列中。

單獨的輸出線程的預期記錄號從零開始。該線程監視隊列,等待添加預期的記錄號。當預期記錄被添加時,線程將其從隊列中移除,輸出它,遞增其預期的記錄號,然後再次嘗試。

這在我的應用程序中運行得非常好,有四個線程處理記錄和一個處理輸出。

2

1100 K msg/s真的很快(消息的時間少於1微秒)。這個時間相當於從/到隊列(0.1 ... 1微秒)的輸入/獲取消息的時間。所以爲了利用並行化,你必須保持不間斷處理的時間大大超過1微秒(比如1毫秒)。如果你將小消息組裝成更大的消息,這可以完成。在一個數據包中累積1000條消息,並將數據包作爲一個工作單元處理。並行處理單元。