2016-11-17 82 views
1

我使用Interval運算符,並且我想要繼續排出項目,即使在我的管道上發生異常。間隔後繼續工作onErrorResumeNext

所以我嘗試使用onErrorResumeNext發射一個項目的情況下例外。但是我看到在發射這個物品之後,間隔停止發射更多物品。

在這裏我的單元測試。

@Test 
public void testIntervalObservableWithError() { 
    Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS) 
      .map(time -> "item\n") 
      .map(item -> item = null) 
      .map(String::toString) 
      .onErrorResumeNext(t-> Observable.just("item with error emitted")) 
      .subscribe(System.out::print, t->{ 
         System.out.println(t); 
        } 
        ); 
    TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription); 
    testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS); 
} 

I'm混淆這種行爲,爲什麼如果it's接收一個項目從onErrorResumeNext

解決方案的可觀察退訂:

一些解釋後,我意識到,當一個錯誤發生可觀察到的完成。所以我最終將可以有異常的observable包裝到另一個observable中,我使用flatMap。那麼主要的Observable會繼續發射物品。

@Test 
public void testIntervalObservableWithError() { 
    Observable.interval(100, TimeUnit.MILLISECONDS) 
      .map(time -> "item\n") 
      .flatMap(item -> Observable.just(item) 
        .map(String::toString)) 
      .subscribe(System.out::print); 
    TestSubscriber testSubscriber = new TestSubscriber(); 
    testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS); 
} 

如果有任何運營商可以做所有的魔法我想知道。

Regrads

回答

1

您的訂閱中斷,因爲當onErrorResumeNext被觸發,你的上游已經與錯誤結束。你只是發出一個項目,而不是讓異常進入下游。爲了保持上游活着,你必須防止拋出異常。

爲了您的具體的例子解決方案可以是這樣的:

... 
    .map(time -> "item\n") 
    .map(item -> item = null) 
    .map(item -> { 
     try { 
      return item.toString(); 
     } catch (NullPointerException e) { 
      return "item with error emitted"; 
    }) 
    //no onErrorResumeNext() 
    .subscribe ... 

onErrorResumeNext剛剛替換錯誤與項目,並呼籲onComplete

+0

嗨,請閱讀我的代碼示例。我正在使用onErrorResumeNext,它不工作。 – paul

+0

我最初閱讀你的代碼。在你的情況下,不可能保持'interval'活着,因爲你在一個空項目上調用'.map(String :: toString)',流會立即以錯誤發生並且這個錯誤被'onResumeErrorNext()'捕獲,發射一個項目和流調用'onComplete'。這就是你的問題的答案**爲什麼可觀察的取消訂閱?**。我也更新了我的答案。 –

+0

並再次更新。 –

1

通過RxJava流使用的合同是,一旦錯誤被髮射沒有更多的項目應被髮射。如果你的用例要求錯誤發生後流繼續,那麼你需要將錯誤轉換爲onNext排放。創建一個包裝類型說ValueOrError<T>,並開始在Observable<ValueOrError<T>>角度思考:

Observable<Integer> source = ... 
Observable<ValueOrError<Integer>> o = 
    source.map(x -> { 
    try { 
     return new ValueOrError<>(mightThrow(x)); 
    } 
    catch (Throwable e) { 
     return new ValueOrError<>(e); 
    });