我有一個PublishSubject
和Subscriber
我用它來處理(可能)無限的預處理數據流。問題是一些元素可能包含一些錯誤。我想忽略它們並繼續處理。我該怎麼做?我已經試過這樣的事情:如何在RxJava 2發生錯誤後繼續處理?
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
我的問題是,沒有任何的錯誤處理方法幫我在這裏:
onErrorResumeNext()
- 指示可觀察到發射的 項的序列,如果它遇到錯誤
onErrorReturn( )
- 指示一個 可觀察時遇到錯誤以發射特定項目
onExceptionResumeNext( )
- 指示可觀察到繼續 發射項目遇到異常後(而不是另一個 各種拋出)
retry( )
- 如果可觀察源發出 錯誤,重新訂閱它的希望,它將完成而不 錯誤
retryWhen( )
- 如果可觀察源發出錯誤,傳遞 錯誤到另一個可觀察到,以確定是否重新訂閱所述源
例如,我試過retry()
,但它無限期地在錯誤後掛起了我的進程。
我也試過onErrorResumeNext()
但預計它不工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
僅打印foo
和bar
。
您是否嘗試了*'onErrorResumeNext'?你已經顯示的文檔有點奇怪,但是這聽起來像是你所描述的想要它做的事情......我認爲斷開連接是API假設,如果一個Observable失敗,它可能無法推進到下一個要素;所以它的寫法是讓你提供一個新的Observable來繼續序列;所以你只需要找出錯誤導致你離開的地方,不是嗎? –
'onErrorResumeNext'有一個參數,如果我用同一個'subject'調用它,我仍然看不到剩下的下一個值和'onComplete'。如果我添加一個新的「Subject」,它顯然不會將訂戶添加到上一個。 –
好的,對你來說它是「顯而易見的」它不會有用戶,對我而言,如果錯誤處理函數沒有考慮到這一點,那麼錯誤處理函數究竟做了什麼?它被命名和記錄爲你的問題的解決方案,所以再次*你有沒有試過它*? –