2017-03-14 62 views
2

我有一個PublishSubjectSubscriber我用它來處理(可能)無限的預處理數據流。問題是一些元素可能包含一些錯誤。我想忽略它們並繼續處理。我該怎麼做?我已經試過這樣的事情:如何在RxJava 2發生錯誤後繼續處理?

val subject = PublishSubject.create<String>() 
    subject.retry().subscribe({ 
     println("next: $it") 
    }, { 
     println("error") 
    }, { 
     println("complete") 
    }) 

    subject.onNext("foo") 
    subject.onNext("bar") 
    subject.onError(RuntimeException()) 
    subject.onNext("wom") 
    subject.onComplete() 

我的問題是,沒有任何的錯誤處理方法幫我在這裏:

onErrorResumeNext() - 指示可觀察到發射的 項的序列,如果它遇到錯誤

onErrorReturn( ) - 指示一個 可觀察時遇到錯誤以發射特定項目

onExceptionResumeNext( ) - 指示可觀察到繼續 發射項目遇到異常後(而不是另一個 各種拋出)

retry( ) - 如果可觀察源發出 錯誤,重新訂閱它的希望,它將完成而不 錯誤

retryWhen( ) - 如果可觀察源發出錯誤,傳遞 錯誤到另一個可觀察到,以確定是否重新訂閱所述源

例如,我試過retry(),但它無限期地在錯誤後掛起了我的進程。

我也試過onErrorResumeNext()但預計它不工作:

val backupSubject = PublishSubject.create<String>() 
    val subject = PublishSubject.create<String>() 
    var currentSubject = subject 
    subject.onErrorResumeNext(backupSubject).subscribe({ 
     println("next: $it") 
    }, { 
     println("error") 
     currentSubject = backupSubject 
    }, { 
     println("complete") 
    }) 

    backupSubject.subscribe({ 
     println("backup") 
    }, { 
     println("backup error") 
    }) 

    currentSubject.onNext("foo") 
    currentSubject.onNext("bar") 
    currentSubject.onError(RuntimeException()) 
    currentSubject.onNext("wom") 
    currentSubject.onComplete() 

僅打印foobar

+0

您是否嘗試了*'onErrorResumeNext'?你已經顯示的文檔有點奇怪,但是這聽起來像是你所描述的想要它做的事情......我認爲斷開連接是API假設,如果一個Observable失敗,它可能無法推進到下一個要素;所以它的寫法是讓你提供一個新的Observable來繼續序列;所以你只需要找出錯誤導致你離開的地方,不是嗎? –

+0

'onErrorResumeNext'有一個參數,如果我用同一個'subject'調用它,我仍然看不到剩下的下一個值和'onComplete'。如果我添加一個新的「Subject」,它顯然不會將訂戶添加到上一個。 –

+0

好的,對你來說它是「顯而易見的」它不會有用戶,對我而言,如果錯誤處理函數沒有考慮到這一點,那麼錯誤處理函數究竟做了什麼?它被命名和記錄爲你的問題的解決方案,所以再次*你有沒有試過它*? –

回答

7

如果您想在出現錯誤後繼續處理,這意味着您的錯誤與您的String相似,應該通過onNext。爲了在這種情況下確保類型安全,你應該使用某種形式的包裝器,它可以採取常規值或錯誤;例如,io.reactivex.Notification<T>可在RxJava 2中獲得:

PublishSubject<Notification<String>> subject = PublishSubject.create(); 

subject.subscribe(System.out::println); 

subject.onNext(Notification.createOnNext("Hello")); 
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops"))); 
subject.onNext(Notification.createOnNext("World")); 
+0

'Notification'就像函數式語言中的'Either'類型嗎? –

+1

類似於「嘗試」壓縮成一個類。 – akarnokd

相關問題