2017-06-19 34 views
4

我使用TPL數據流來創建它們通過輸出到輸出bufferBlock是否有一種慣用的方式來路由在TPL數據流圖中的TransformBlock中失敗的元素?

inputQueue = new BufferBlock<InputPacket>; 
processQueue = new TransformBlock <InputPacket, OutputPacket>; 
outputQueue = new BufferBlock<OutputPacket>; 

inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true }); 
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true }); 

是否有路徑元件,其失敗的慣用方式TransformBlock處理的輸入元件的bufferBlock?

InputPacket要素齊全的加工,動作與 processQueue關聯將返回OutputPacket將被路由到outputQueue

如果有processQueue相關的動作調用一個不可靠webservice, 然後處理一些InputPacket元素將超時, ,我想重試這些元素x次。但我不想立即嘗試它們,我想將它們放回到輸入隊列中。

我希望能夠路由InputPacket元件,其 超時回到inputQueue直到他們失敗x次,然後以failureQueue

BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>; 

使用LinkTo謂詞由於涉及兩種不同類型而變得複雜:

InputPacket OutputPacket 

我看起來像我可以處理這種改變:

processQueue = new TransformBlock <InputPacket, ParentPacketType>; 

,然後寫謂詞基於數據包的類型。

通過在inputElement輸出存儲作爲InputPacket成員,

但既不似乎良好的方式來實現這一點。

+1

這個項目包含LinkSubTypeTo這似乎是爲了這個目的:https://github.com/gridsum/DataflowEx –

回答

3

首先,我認爲你的術語很混亂,你應該堅持使用TPL Dataflow。數據流圖不是由隊列組成的,它由組成。並且這些塊不處理數據元素處理

現在,當TPL Dataflow不能爲您提供您想要的塊時,一種解決方案是自己構建該塊,並使用所提供的塊。這方面的一個簡單的版本看起來是這樣的:

public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform, int retryCount, 
    ITargetBlock<(TInput, Exception)> failureBlock) 
{ 
    var failedInputs = new Dictionary<TInput, int>(); 

    TransformManyBlock<TInput, TOutput> resultBlock = null; 

    resultBlock = new TransformManyBlock<TInput, TOutput>(
     async input => 
     { 
      try 
      { 
       return new[] { transform(input) }; 
      } 
      catch (Exception exception) 
      { 
       failedInputs.TryGetValue(input, out int count); 

       if (count < retryCount) 
       { 
        failedInputs[input] = count + 1; 
        // ignoring the returned Task, to avoid deadlock 
        _ = resultBlock.SendAsync(input); 
       } 
       else 
       { 
        failedInputs.Remove(input); 
        await failureBlock.SendAsync((input, exception)); 
       } 

       return Array.Empty<TOutput>(); 
      } 
     }); 

    return resultBlock; 
} 

假設我提出:

  • 您可以使用C#7.0。如果不是,我使用的功能很容易更換。
  • 可以忽略除最後一個異常之外的所有內容。否則,Dictionary將不得不存儲所有先前的例外,然後將它們發送到failureBlock
  • 將失敗的數據元素髮送回同一個塊可以。如果沒有,該方法將不得不採取一個參數,並使用它。
  • 該塊不需要支持並行性。如果是這樣,你將不得不使代碼線程安全(你可能會開始使用ConcurrentDictionary而不是Dictionary)。
  • 輸入數據元素可以存儲在字典中(請閱讀:它們的GetHashCode行爲正確)並且不會有重複的輸入。否則,你將不得不設計一些其他機制來重新計數。
+0

我要對這個辦法的工作,它是否適合我,我會回來帶剔,歡呼複合 –

相關問題