目前我正在處理管道數據流,其中除階段1以外的每個階段都是運行消費者和生產者的async
。我有物品「流經」我的管道,這些參考物品。在第3階段,我想創建一個循環並緩衝滿足特殊條件(Stage Loop)的所有對象。TPL數據流 - 有條件的循環
如果新對象進入(第3階段),同時還有其他對象正在緩衝(Stage Loop),我想檢查它們是否與它們的引用項目匹配,如果是,則將它們發佈到Stage Loop的BufferBlock
。
問題是,如何從Stage 3中檢查Stage Loop中所有對象的引用項目?
管道還挺看起來是這樣的:
Incoming objects ->
BufferBlock1 -> Parsing (Stage2) ->
BufferBlock2 -> Processing (Stage3) ->
BufferBlock3 -> Stage Loop ->
Back to BufferBlock 2
我想我會遇到一個計時問題,其中object1進來,將在CheckForExistingProcessing內被拒絕並被轉發到stage4,而object2進來並通過檢查,因爲兩個對象的引用項被釋放鎖定它。所以我需要檢查stage4中是否有對象,即使在執行檢查之前引用了相同的項目。我需要的是某種Queue,它根據對鎖的檢查以及是否已經爲同一項目鎖定了項目來推遲對象),但是保持對象直到鎖再次可用。 – Peter
然後您可以在此步驟嘗試BlockingCollection。 – VMAtm
我能夠消除循環和鎖定我的管道,我已經爲它創建了一個測試類。一個問題是,如果我需要按照它們進來的順序導入文件,並且我在「一次一個文件」的基礎上創建管道。如果我打電話給SendAsync,我怎麼確定它會按照它的順序處理?我可以在管道之前添加一個BufferBlock,消費者可以在這裏確保訂單,但有沒有更智能的方法? – Peter