2013-06-27 63 views
0

我對IObservable(Reactive Extensions)有以下擴展方法。我想運行Action(無返回值),但使用我的Maybe<T> monad傳遞運行操作的異常狀態。如果動作完成,我希望通過None.Default,否則一些包含引發的異常。我處理RX異常的自定義組合器不起作用

我實現低於

public static IObservable<Maybe<Exception>> Catch<T> 
    (this IObservable<T> source 
    , Action<T> action 
    ) 
{ 
    var happy = source.Select(p => { 
     action(p); 
     return None<Exception>.Default; 
    }); 
    return happy.Catch<Maybe<Exception>,Exception> 
     (e => Observable 
       .Return(e.ToMaybe()).Concat(happy)); 

} 

,我有一個測試用例

[Fact] 
public void TryShouldHandleExceptions() 
{ 
       // Make a hot observable 
    var b = new List<int>() { 0, 1, 2 }.ToObservable().Publish().RefCount(); 

    // Execute actions and collect exception state 
    IObservable<Maybe<Exception>> e = b.Catch(v => { 
     if (v==1) 
     { 
      throw new Exception("Ouch"); 
     } 
    }); 

    var r = e.ToEnumerable().ToList(); 

    r.Count().Should().Be(3); 

    r[0].IsSome.Should().Be(false); 
    r[1].IsSome.Should().Be(true); 
    r[2].IsSome.Should().Be(false); 

} 

然而測試用例失敗和異常堆滿 起來,而不是由RX捕撈方法進行處理。注意 確定我在這裏做錯了什麼。有任何想法嗎?

順便說一句。紅利點,如果你能想出一個更好的名字 比Catch這種方法。

+0

其實我只是有一個想法,測試不完全是我的想法。也許當我重新訂閱時,我會從開頭重新訂閱,而不是從下一個值開始訂閱。雖然我認爲像我這樣做「熱」會解決這個問題。 – bradgonesurfing

+0

'Publish().RecCount()'並不真正使它「熱」。沒有觀察員的時候,這仍然是一個很冷的觀察點。一旦觀察者訂閱,它會變得很熱,並且更多的觀察者會分享這個流。一旦所有觀察員都取消訂閱,它會再次變冷,並在下一位觀察員訂閱時「重新開始」。這是因爲'RefCount()'正在跟蹤觀察者的數量,並隨着計數的變化而從冷的<->變熱。 – Brandon

+1

除了每次啓動的問題外,它還有一個問題,即在第一個異常和你的'(e => Observable ...'子句被觸發後,你返回的新的替換觀察值本身沒有'Catch '附加條款,因此_next_異常將不會被捕獲(這是因爲你重新開始是你的第一個異常再次重複)。你應該將'happy.Catch(action)'而不是'happy'編成一個。 – Brandon

回答

2

我只是太複雜了。這工作正常

public static IObservable<Maybe<Exception>> Catch<T> 
     (this IObservable<T> source 
     , Action<T> action 
     ) 
    { 
     return source.Select(p => { 
      try 
      { 
       action(p); 
       return None<Exception>.Default; 
      } 
      catch (Exception e) 
      { 
       return e.ToMaybe(); 
      } 
     }); 

    } 
+0

是的這是最直接的方式,避免了所有的重新訂閱邏輯。 – Brandon