在Rx中有一個行爲合約,其中流只能是OnNext*(OnError|OnCompleted)
。換言之,零或多個OnNext
以及最後只有一個OnError
或OnCompleted
。
所以,不,你不能配置Rx。如果你這樣做,它將不再是Rx。
但是,您可以做的是編寫一個可以重試源代碼的查詢。
如果你寫你這樣的代碼:
async Task<int> ActionAsync(int i)
{
if (i == 2)
throw new Exception();
return i;
}
void Main()
{
var sb = new Subject<int>();
sb
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry()
.Subscribe(_ => _.Dump());
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
然後你就可以獲得:
1
Exception of type 'System.Exception' was thrown.
3
根據您的意見詢問性能問題,不存在任何性能問題在使用.Retry()
,但有一個行爲問題。
如果源是冷 - 像var sb = new [] { 1, 2, 3 }.ToObservable();
- 那麼.Retry()
將重新開始與整個觀察到的序列,造成的無限序列:
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
...
在你的代碼的情況下,可觀察到的是一個炎熱的觀察到這樣這不會發生。
如果你想在冷的可觀察點上做到這一點,你需要通過.Publish(...)
使它變熱。像這樣:
var sb = new[] { 1, 2, 3 }.ToObservable();
sb
.Publish(sbp =>
sbp
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry())
.Subscribe(_ => _.Dump());
然後預期的行爲返回。
沒有Rx你不能。:-) –