0
我對IObservable(Reactive Extensions)有以下擴展方法。我想運行Action
(無返回值),但使用我的Maybe<T>
monad傳遞運行操作的異常狀態。如果動作完成,我希望通過None.Default,否則一些包含引發的異常。我處理RX異常的自定義組合器不起作用
我實現低於
public static IObservable<Maybe<Exception>> Catch<T>
(this IObservable<T> source
, Action<T> action
)
{
var happy = source.Select(p => {
action(p);
return None<Exception>.Default;
});
return happy.Catch<Maybe<Exception>,Exception>
(e => Observable
.Return(e.ToMaybe()).Concat(happy));
}
,我有一個測試用例
[Fact]
public void TryShouldHandleExceptions()
{
// Make a hot observable
var b = new List<int>() { 0, 1, 2 }.ToObservable().Publish().RefCount();
// Execute actions and collect exception state
IObservable<Maybe<Exception>> e = b.Catch(v => {
if (v==1)
{
throw new Exception("Ouch");
}
});
var r = e.ToEnumerable().ToList();
r.Count().Should().Be(3);
r[0].IsSome.Should().Be(false);
r[1].IsSome.Should().Be(true);
r[2].IsSome.Should().Be(false);
}
然而測試用例失敗和異常堆滿 起來,而不是由RX捕撈方法進行處理。注意 確定我在這裏做錯了什麼。有任何想法嗎?
順便說一句。紅利點,如果你能想出一個更好的名字 比Catch
這種方法。
其實我只是有一個想法,測試不完全是我的想法。也許當我重新訂閱時,我會從開頭重新訂閱,而不是從下一個值開始訂閱。雖然我認爲像我這樣做「熱」會解決這個問題。 – bradgonesurfing
'Publish().RecCount()'並不真正使它「熱」。沒有觀察員的時候,這仍然是一個很冷的觀察點。一旦觀察者訂閱,它會變得很熱,並且更多的觀察者會分享這個流。一旦所有觀察員都取消訂閱,它會再次變冷,並在下一位觀察員訂閱時「重新開始」。這是因爲'RefCount()'正在跟蹤觀察者的數量,並隨着計數的變化而從冷的<->變熱。 – Brandon
除了每次啓動的問題外,它還有一個問題,即在第一個異常和你的'(e => Observable ...'子句被觸發後,你返回的新的替換觀察值本身沒有'Catch '附加條款,因此_next_異常將不會被捕獲(這是因爲你重新開始是你的第一個異常再次重複)。你應該將'happy.Catch(action)'而不是'happy'編成一個。 – Brandon