2013-07-05 47 views
52

預告:夥計們,這個問題不是關於如何實施重試策略。這是關於TPL Dataflow塊的正確完成。實施重試塊的正確完成

這個問題主要是我以前的問題Retry policy within ITargetBlock的延續。這個問題的答案是@ svick的智能解決方案,它利用TransformBlock(來源)和TransformManyBlock(目標)。剩下的唯一問題是以正確的方式完成該塊:等待所有重試先完成,然後完成目標塊。以下是我結束了(它只是一個片段,不付出太多的關注到非線程retries集):

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    while (target.InputCount > 0 || retries.Any()) 
     await Task.Delay(100); 

    target.Complete(); 
}); 

的想法是執行某種輪詢並驗證是否有仍然等待處理的消息,並且沒有需要重試的消息。但在這個解決方案中,我不喜歡投票的想法。

是的,我可以將添加/刪除重試的邏輯封裝到單獨的類中,甚至可以當一組重試變爲空時執行一些操作,但如何處理target.InputCount > 0條件?沒有這樣的回調,當塊沒有待處理的消息時被調用,所以似乎驗證target.ItemCount在具有小延遲的循環中是唯一的選擇。

有沒有人知道一個更聰明的方法來實現這一目標?

+1

看起來ITargetBlock通過由AsObserver Extension方法返回的觀察者支持基於推送的通知。請參閱http://msdn.microsoft.com/en-us/library/hh160359.aspx和http://msdn.microsoft.com/en-us/library/ee850490.aspx。 – JamieSee

+0

看起來你正在嘗試將異常用作正常的程序流程,這是不好的做法。 搜索谷歌或看看下面的話題上SO: http://stackoverflow.com/questions/729379/why-not-use-exceptions-as-regular-flow-of-control 所有的重試邏輯應該在try塊中,而不在異常塊中。不是你的問題的答案,而是我認爲你應該知道的東西。 – Nullius

+4

@Nullius,重試邏輯基於*例外* - 在出現瞬時錯誤時重試。我不認爲'try'塊中的重試邏輯是一個好主意,因爲你不知道錯誤類型以及這種錯誤是否是暫時的。 – Alex

回答

1

結合hwcverwe答案,JamieSee評論可能是理想的解決方案。

首先,你需要創建一個以上的事件:

var signal = new ManualResetEvent(false); 
var completedEvent = new ManualResetEvent(false); 

然後,你必須創建一個觀察者,並訂閱TransformManyBlock,所以通知您當相關事件發生:

var observer = new RetryingBlockObserver<TOutput>(completedEvent); 
var observable = target.AsObservable(); 
observable.Subscribe(observer); 

可觀察可以很容易的:

private class RetryingBlockObserver<T> : IObserver<T> { 
     private ManualResetEvent completedEvent; 

     public RetryingBlockObserver(ManualResetEvent completedEvent) {     
      this.completedEvent = completedEvent; 
     } 

     public void OnCompleted() { 
      completedEvent.Set(); 
     } 

     public void OnError(Exception error) { 
      //TODO 
     } 

     public void OnNext(T value) { 
      //TODO 
     } 
    } 

你CA N等待任一該信號,或完成(全部的源項的耗盡),或兩者

source.Completion.ContinueWith(async _ => { 

      WaitHandle.WaitAll(completedEvent, signal); 
      // Or WaitHandle.WaitAny, depending on your needs! 

      target.Complete(); 
     }); 

可以檢查了WaitAll的結果值來了解哪個事件被設置,並相應地作出反應。 您也可以將其他事件添加到代碼中,將它們傳遞給觀察者,以便在需要時設置它們。您可以區分您的行爲並在出現錯誤時作出不同的響應,例如

2

也許一個ManualResetEvent可以爲你做的伎倆。

的公共屬性添加到TransformManyBlock

private ManualResetEvent _signal = new ManualResetEvent(false); 
public ManualResetEvent Signal { get { return _signal; } } 

在這裏你去:

var retries = new HashSet<RetryingMessage<TInput>>(); 

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; 
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message => 
    { 
     try 
     { 
      var result = new[] { await transform(message.Data) }; 
      retries.Remove(message); 

      // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
      if(!retries.Any()) Signal.Set(); 
      return result; 
     } 
     catch (Exception ex) 
     { 
      message.Exceptions.Add(ex); 
      if (message.RetriesRemaining == 0) 
      { 
       if (failureHandler != null) 
        failureHandler(message.Exceptions); 

       retries.Remove(message); 

       // Sets the state of the event to signaled, allowing one or more waiting threads to proceed 
       if(!retries.Any()) Signal.Set(); 
      } 
      else 
      { 
       retries.Add(message); 
       message.RetriesRemaining--; 

       Task.Delay(retryDelay) 
        .ContinueWith(_ => target.Post(message)); 
      } 
      return null; 
     } 
    }, dataflowBlockOptions); 

source.LinkTo(target); 

source.Completion.ContinueWith(async _ => 
{ 
    //Blocks the current thread until the current WaitHandle receives a signal. 
    target.Signal.WaitOne(); 

    target.Complete(); 
}); 

我不知道您的target.InputCount設置。因此,在這個地方你改變target.InputCount您可以添加以下代碼:

if(InputCount == 0) Signal.Set(); 
+0

事情是:'target.InputCount'是一個*黑盒子* - 它是TPL Dataflow中'TransformManyBlock'的只讀屬性。 – Alex