2015-12-03 46 views
3

我想用rx-java構建一個健壯的處理管道,但是我遇到了一個問題。在錯誤後重試同一項目

下面是一個例子:

public static void main(String[] args) { 
    AtomicInteger div = new AtomicInteger(-1); 
    Observable.just(1, 1, 1).map(item -> 1/div.getAndIncrement()) 
      .retry().subscribe(item -> System.out.println(item)); 
} 

在這種情況下,輸出爲4項,由於非流觀察的重播,但事實並非如此相關不理它簡單起見。我添加註釋,展示了計算達到的結果和重新訂閱點:

-1 // 1/-1 
// 1/0 (error) - resubscribes to observable 
1 // 1/1 
0 // 1/2 
0 // 1/3 

這是因爲retry操作(就像所有的重試運營),將造成訂閱一次錯誤的通知傳遞。

我的預期輸出是:

-1 // 1/-1 
// 1/0 (error) - resubscribe but resume erroneous item (1) 
1 // 1/1 
0 // 1/0 

當錯誤的通知傳遞,重新訂閱過程應包括流(重試對同一項目)中的錯誤項 - 因爲錯誤是外部而不是嵌入到已處理的項目中(因此重新處理將有意義)。

這是一些這裏我想的是不被處理爲與一些延遲重新處理的物品的外部錯誤(如數據庫連接性)的情況。我知道使用標準的重試操作員可以重新訂閱,但所有人都放棄了錯誤的項目。

我也想過在try-catch代碼,我懷疑錯誤是可能的,但增加了樣板代碼到我的處理代碼,我不喜歡我的包裹所有處理。

所以我的問題是:有沒有一個標準的方式來重試的項目失敗

我已經想過做這樣的事情(未測試):

someSubject.flatMap(
    item-> Observable.just(item) 
     .doOnError(err -> someSubject.onNext(item))).onErrorX... 

,抑制錯誤...

但這似乎不自然,而且很昂貴,我用例(創建一個可觀察每個項目)。

是否有運營商,還是運營商的結合,可能會導致重試傳遞錯誤項回到觀察到的開頭沒有「破」或包裝在不同的觀察到的項目嗎?

這也是我習慣使用async-retry後的重試樣式。

回答

4

這對於RxJava來說通常是不可能的。如果某個元素的處理失敗,那麼就沒有從該位置恢復的內置方法。你可以做的最好的方法就是試着捕捉有問題的函數回調並手動重試。第二個最好的辦法是使用flatMap其中可能有問題的計算內Observable可以單獨重試:

source.flatMap(v -> 
    Observable.just(v).map(v -> v/counter.getAndIncrement()).retry() 
) 
+0

感謝。這就是我所懷疑的。 –