2017-08-23 65 views
0

我有一個列表Observables,我目前通過Observable.concatDelayError()訂閱。我的要求發生了變化,因此我現在想要有條件地延遲錯誤Observable.concatDelayError()有條件地繼續

如果一個Observable發出一個錯誤,我想在這一點上決定是否允許剩下的Observable繼續或者序列終止。

一個用例是如果錯誤類型爲TimeoutException,那麼我會中止其餘的Observables;否則,我繼續處理其餘的Observables。理想情況下,無論我中止還是繼續錯誤,我仍然喜歡最後報告的錯誤,因爲concatDelayError()目前的行爲。

我想我正在尋找的東西沿線:Observable.concatDelayError(Iterable<Observable<T>> sources, Func1<Throwable, Boolean> predicate)

回答

1

編輯:

保留非超時錯誤是更復雜:

PublishSubject<Object> timeout = PublishSubject.create(); 

Observable.from(sources).concatMapDelayError(v -> v.onErrorResumeNext(e -> { 
    if (e instanceof TimeoutException) { 
     timeout.onError(e); 
     return Observable.empty(); 
    } 
    return Observable.error(e); 
})) 
.takeUntil(timeout) 
.subscribe(...); 
+0

感謝@akarnokd。除了最後我需要知道發生任何**錯誤的事實之外,這種方法運作良好。所以,如果拋出TimeoutException,我想中止流;如果發生其他錯誤,我希望流繼續進行,但最終知道遇到錯誤。 –

+0

更新了答案。 – akarnokd

+0

最優秀的@akarnokd。這很好用!一方面的問題:在超時情況下返回「Observable.empty()」有什麼好處嗎?事情仍然按照預期的方式工作 - IF中止並返回錯誤。在語義上,似乎也是有意義的,即所有情況下都返回錯誤;儘管,最終,'timeout.onError(e)'是最重要的部分。 –