2015-05-29 98 views
0

我有一個Observable,其中每個項目都以可能導致異常的方式轉換,但可以重試。我不希望失敗破壞流,因爲每個項目代表一個獨立的事務。我能拿出最好的解決辦法是這樣的:RxJava:重試映射操作

final AtomicLong errCount = new AtomicLong(); 
    Observable.from(ImmutableList.of(1L, 2L, 3L)).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(Long aLong) { 
      return Observable.from(ImmutableList.of(aLong)).map(new Func1<Long, Long>() { 
       @Override 
       public Long call(Long aLong) { 
        if (aLong == 2 && errCount.getAndIncrement() < 1) { 
         throw new RuntimeException("retryable error"); 
        } 
        return aLong * 100; 
       } 
      }).retry(2); 
     } 
    }).forEach(new Action1<Long>() { 
     @Override 
     public void call(Long aLong) { 
      System.out.println(aLong); 
     } 
    }); 

// Desired output: 100, 200, 300 (not 100, 100, 200, 300) 

問題:

  • 重試邏輯是非常囉嗦。
  • 如果任何項目在2次重試後失敗,則流被打破(不再處理更多項目)。我想要一個乾淨的方式來返回像Finagle的Try這樣的異常和結果,所以我可以處理所有異常。

回答

1

重試邏輯真的很冗長。

您可以完全通過切換到Observable.just(t1, t2, t3)構造函數來避免使用ImmutableList。這基本上做同樣的事情,但不太詳細。

我看到你flatMapping爲了將每個值轉換爲Observable。這可以防止映射單個值以取消訂閱整個鏈時遇到的onError。所以當一個運算符拋出時,它將會退訂這個值的內部可觀察鏈。否則,一個錯誤會導致取消訂閱並從主外部可觀察值重新訂閱。

如果你想保持這種行爲,但減少鍋爐板(除了明顯切換到Java8 lambda表達式),我可以想出2個選擇。

首先,重新登錄和重複數據刪除您的數據後重試。如果您的值具有良好的hashcodeequals實現,則只有當該集合尚未包含該值時,纔可以使用過濾器追加到有狀態集和onNext。

Observable.<Long> just(1L, 2L, 3L) 
     .map(new Func1<Long, Long>() { 
      @Override 
      public Long call(Long aLong) { 
       if (aLong == 2 && errCount.getAndIncrement() < 1) { 
        throw new RuntimeException("retryable error"); 
       } 
       return aLong * 100; 
      }}) 
     .retry(2) 
     .filter(new Func1<Long, Boolean>() { 
      Set<Long> state = null; 

      @Override 
      public Boolean call(Long a) { 
       if (state == null) 
        state = new HashSet<Long>(); 
       if (!state.contains(a)) { 
        state.add(a); 
        return true; 
       } 
       return false; 
      }}) 
     .forEach(new Action1<Long>() { 
      @Override 
      public void call(Long aLong) { 
       System.out.println(aLong); 
      }}); 

其次,你可以從何時重新訂閱它離開的地方切換你觀察到的,以簡歷。請注意,當使用緩衝區的操作符(observeOn,merge,flatMap)時,這可能會導致數據丟失問題。這是因爲他們將繼續以與下游消費者脫鉤的方式消費。所以你要確保在重試之前不要緩衝。如果您正在實施支持背壓的可觀察源,還需要考慮其他因素。

// Should resume right where it left off 
resumableObservable.map(...).retry(2).observeOn() 

// Don't do this. ObserveOn will buffer values and resume will lose data. 
resumableObservable.map(...).observeOn().retry(2) 

// Also bad if running async observables. Merging buffers so this could have data loss. 
Observable.merge(resumableObservable.map(...)).retry(2) 

如果任何一項失敗後2個重試,流斷開(沒有更多的項目被處理)。我想要一個乾淨的方式來返回像Finagle的Try這樣的異常和結果,所以我可以處理所有異常。

您可以將不可靠的地圖從Long -> Long更改爲Long -> Tuple<Long, List<Exception>>。由於這是相當多的泛型,並且很快變得麻煩,我建議使用重試運算符的不同變體,即retryWhen(Func1<Observable<Throwable>, Observable<?>>)。這裏是一個如何在你的代碼中使用它的例子。

}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){ 
@Override 
public Observable<?> call(Observable<? extends Throwable> o) { 
    final AtomicInteger count = new AtomicInteger(); 
    return o.filter(new Func1<Throwable, Boolean>() { 
     @Override 
     public Boolean call(Throwable t) { 
      return t instanceof RuntimeException || count.getAndIncrement() < 5; 
     }}).delay(1, TimeUnit.SECONDS, Schedulers.immediate()); 
}}) 

使用重試的好處是,您可以在非阻塞樣式的一段時間後輕鬆實現延遲重試。