我使用Interval運算符,並且我想要繼續排出項目,即使在我的管道上發生異常。間隔後繼續工作onErrorResumeNext
所以我嘗試使用onErrorResumeNext
發射一個項目的情況下例外。但是我看到在發射這個物品之後,間隔停止發射更多物品。
在這裏我的單元測試。
@Test
public void testIntervalObservableWithError() {
Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.map(item -> item = null)
.map(String::toString)
.onErrorResumeNext(t-> Observable.just("item with error emitted"))
.subscribe(System.out::print, t->{
System.out.println(t);
}
);
TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}
I'm混淆這種行爲,爲什麼如果it's接收一個項目從onErrorResumeNext
解決方案的可觀察退訂:
一些解釋後,我意識到,當一個錯誤發生可觀察到的完成。所以我最終將可以有異常的observable包裝到另一個observable中,我使用flatMap。那麼主要的Observable會繼續發射物品。
@Test
public void testIntervalObservableWithError() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.flatMap(item -> Observable.just(item)
.map(String::toString))
.subscribe(System.out::print);
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}
如果有任何運營商可以做所有的魔法我想知道。
Regrads
嗨,請閱讀我的代碼示例。我正在使用onErrorResumeNext,它不工作。 – paul
我最初閱讀你的代碼。在你的情況下,不可能保持'interval'活着,因爲你在一個空項目上調用'.map(String :: toString)',流會立即以錯誤發生並且這個錯誤被'onResumeErrorNext()'捕獲,發射一個項目和流調用'onComplete'。這就是你的問題的答案**爲什麼可觀察的取消訂閱?**。我也更新了我的答案。 –
並再次更新。 –