2016-09-07 92 views
2

我有獨立事件流,它們與反應擴展異步處理。該處理程序可能因任何原因失敗,但該流仍在繼續。C#Rx - 忽略錯誤

但是,在Rx中,發生錯誤後,它會自動取消訂閱。這在某種程度上是可配置的嗎?

實施例:

async Task<Unit> ActionAsync(int i) 
{ 
    if (i > 1) 
     throw new Exception(); 

    i.Dump();  
    return Unit.Default; 
} 

void Main() 
{ 
    var sb = new Subject<int>(); 

    sb.SelectMany(ActionAsync).Subscribe(
     _ => { }, 
     ex => 
     { 
      ex.Dump(); 
     } 
    ); 


    sb.OnNext(1); 
    sb.OnNext(2); 
    sb.OnNext(3); 
} 

我想有以下輸出:

  • 異常

我可以實現這一點沒有嘗試/趕上ActionAsync

+0

沒有Rx你不能。:-) –

回答

6

在Rx中有一個行爲合約,其中流只能是OnNext*(OnError|OnCompleted)。換言之,零或多個OnNext以及最後只有一個OnErrorOnCompleted

所以,不,你不能配置Rx。如果你這樣做,它將不再是Rx。

但是,您可以做的是編寫一個可以重試源代碼的查詢。

如果你寫你這樣的代碼:

async Task<int> ActionAsync(int i) 
{ 
    if (i == 2) 
     throw new Exception(); 

    return i; 
} 

void Main() 
{ 
    var sb = new Subject<int>(); 

    sb 
     .SelectMany(ActionAsync) 
     .Do(_ => { }, ex => ex.Dump()) 
     .Retry() 
     .Subscribe(_ => _.Dump()); 

    sb.OnNext(1); 
    sb.OnNext(2); 
    sb.OnNext(3); 
} 

然後你就可以獲得:

 
1 
Exception of type 'System.Exception' was thrown. 
3 

根據您的意見詢問性能問題,不存在任何性能問題在使用.Retry(),但有一個行爲問題。

如果源是冷 - 像var sb = new [] { 1, 2, 3 }.ToObservable(); - 那麼.Retry()將重新開始與整個觀察到的序列,造成的無限序列:

 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
... 

在你的代碼的情況下,可觀察到的是一個炎熱的觀察到這樣這不會發生。

如果你想在冷的可觀察點上做到這一點,你需要通過.Publish(...)使它變熱。像這樣:

var sb = new[] { 1, 2, 3 }.ToObservable(); 

sb 
    .Publish(sbp => 
     sbp 
      .SelectMany(ActionAsync) 
      .Do(_ => { }, ex => ex.Dump()) 
      .Retry()) 
    .Subscribe(_ => _.Dump()); 

然後預期的行爲返回。

+0

這是什麼性能影響? Retry()調用是否昂貴? – nothrow

+0

@Yossarian - 沒有任何性能問題,但是有行爲問題。我會附上答案。 – Enigmativity

0

使用Materialize

async Task<Unit> ActionAsync(int i) 
    { 
     if (i > 1) 
      throw new Exception(); 

     i.Dump(); 
     return Unit.Default; 
    } 

    void Main() 
    { 
     var sb = new Subject<int>(); 
     sb.SelectMany(i => Observable.FromAsync(() => ActionAsync(i)).Materialize()) 
      .Subscribe(item => 
      { 
       if (item.Kind == NotificationKind.OnError) 
       { 
        item.Exception.Dump(); 
       } 
       //else if (item.Kind == NotificationKind.OnNext) 
       //{ 
       // var value = item.Value; 
       //} 
       //else if (item.Kind == NotificationKind.OnCompleted) 
       //{ 
       //} 
      } 
     ); 
     sb.OnNext(1); 
     sb.OnNext(2); 
     sb.OnNext(3); 
    } 

注意,如果核心邏輯保持不變,你將永遠不會輸出「3」既然你有例外(I> 1),其中包括3你可能想改變(i> 1)到(i == 2)以獲得問題中指定的輸出。