2016-12-16 31 views
1

目前我正在處理管道數據流,其中除階段1以外的每個階段都是運行消費者和生產者的async。我有物品「流經」我的管道,這些參考物品。在第3階段,我想創建一個循環並緩衝滿足特殊條件(Stage Loop)的所有對象。TPL數據流 - 有條件的循環

如果新對象進入(第3階段),同時還有其他對象正在緩衝(Stage Loop),我想檢查它們是否與它們的引用項目匹配,如果是,則將它們發佈到Stage Loop的BufferBlock

問題是,如何從Stage 3中檢查Stage Loop中所有對象的引用項目?

管道還挺看起來是這樣的:

Incoming objects -> 
    BufferBlock1 -> Parsing (Stage2) -> 
    BufferBlock2 -> Processing (Stage3) -> 
    BufferBlock3 -> Stage Loop -> 
    Back to BufferBlock 2 

回答

0

你真的不需要很多BufferBlock「在你的鏈。 TPL Dataflow包含一個TransformBlock,它封裝了BufferBloсkActionBlock邏輯,並具有處理消息的輸出塊。

至於循環,可以鏈接塊相互之間with static extension method,所以這可能是看起來像

stage2.LinkTo(stage3, CheckForExistingProcessing); 
stage2.LinkTo(stage4); 

傑爾stage4是消息隊列沒有通過檢查的,必須辦理在一個循環中。您可以設置其他ActionBlock,或者,也可以簡單地使用TransformBlock將消息再次發送到適當的階段。我認爲你也可以引入重試檢查,因爲有些消息可能根本無法處理,所以有些原因。

而且,你說你有async邏輯,你應該SendAsync消息,而不是Post他們(你也可以使用過載與CancellationToken):

// asynchronously wait for a sending with resending attempts 
await stage1.SendAsync(m); 
// asynchronously wait for a sending with resending attempts with possible cancellation 
await stage2.SendAsync(m, token); 

Post方法是同步的,如果它們未被目標接受,則丟棄消息,比較嘗試傳遞消息的即使目標目前不能接受它的SendAsync方法。

+0

我想我會遇到一個計時問題,其中object1進來,將在CheckForExistingProcessing內被拒絕並被轉發到stage4,而object2進來並通過檢查,因爲兩個對象的引用項被釋放鎖定它。所以我需要檢查stage4中是否有對象,即使在執行檢查之前引用了相同的項目。我需要的是某種Queue,它根據對鎖的檢查以及是否已經爲同一項目鎖定了項目來推遲對象),但是保持對象直到鎖再次可用。 – Peter

+0

然後您可以在此步驟嘗試BlockingCollection。 – VMAtm

+0

我能夠消除循環和鎖定我的管道,我已經爲它創建了一個測試類。一個問題是,如果我需要按照它們進來的順序導入文件,並且我在「一次一個文件」的基礎上創建管道。如果我打電話給SendAsync,我怎麼確定它會按照它的順序處理?我可以在管道之前添加一個BufferBlock,消費者可以在這裏確保訂單,但有沒有更智能的方法? – Peter