8

我有點新的Rx.NET。是否有可能捕獲任何訂戶可能拋出的異常?採取以下...捕獲異常可以從訂閱拋出OnNext行動

handler.FooStream.Subscribe(
      _ => throw new Exception("Bar"), 
      _ => { }); 

目前,我正在趕上每訂閱基礎與以下的一個實例。它的實現只是使用ManualResetEvent來喚醒一個等待線程。

public interface IExceptionCatcher 
{ 
    Action<T> Exec<T>(Action<T> action); 
} 

和像這樣使用它......

handler.FooStream.Subscribe(
      _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred 
      _ => { }); 

我覺得必須有一些更好的辦法。 Rx.NET中的所有錯誤處理功能是否專門用於處理可疑錯誤?

編輯:每請求,我的實現是https://gist.github.com/1409829(接口和實現在PROD代碼分離成不同的組件)。歡迎反饋。這可能看起來很愚蠢,但我使用windsor城堡來管理許多不同的Rx用戶。

var exceptionCatcher = 
    new ExceptionCatcher(e => 
           { 
            Logger.FatalException(
             "Exception caught, shutting down.", e); 
            // Deal with unmanaged resources here 
           }, false); 


/* 
* Normally the code below exists in some class managed by an IoC container. 
* 'catcher' would be provided by the container. 
*/ 
observable /* do some filtering, selecting, grouping etc */ 
    .SubscribeWithExceptionCatching(processItems, catcher); 

回答

8

內置的可觀察運營商不這樣做你是什麼:此異常捕手與這樣

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher)); 

容器然後,它將是這樣用在observable是的IObservable的實例註冊默認情況下要求(很像事件),但是你可以做一個擴展方法來做到這一點。

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
           this IObservable<T> source 
           ) where TException : Exception 
{ 
    return Observable.CreateWithDisposable<T>(
     o => source.Subscribe(
      v => { try { o.OnNext(v); } 
        catch (TException) { } 
      }, 
      ex => o.OnError(ex), 
      () => o.OnCompleted() 
      )); 
} 

然後,可以通過此方法包裝任何可觀察值以獲得您描述的行爲。

+0

謝謝,你回答我的問題,但你確信周圍OnNext你的try/catch將捕獲異常?人們可以很容易地用返回的IObservable做些事情,這會導致訂閱的代碼在另一個線程上執行。我最初嘗試在我的Subject.OnNext調用中放置try/catch,但是沒有捕獲到異常。然而,我可以創建一個SubscribeWithExceptionHandling方法或其他東西。 – drstevens

+1

@drstevens它會從同一個線程捕獲異常。如果你的觀察者正在啓動它自己的異常操作來拋出異常,這將不會捕獲這些異常。 –

+1

考慮到有多少Rx操作導致新線程(或池中的任務),我想說這很可能是這種情況。在'handler.FooStream'和'Subscribe'之間是一個'GroupByUntil(...)。SelectMany(...)。Buffer(有時間)'。我實際上最終創建了一個'SubscribeWithCatch',它跟隨您的示例捕獲異常,然後使用傳遞給OnError處理程序的相同操作。 – drstevens