重試邏輯真的很冗長。
您可以完全通過切換到Observable.just(t1, t2, t3)
構造函數來避免使用ImmutableList
。這基本上做同樣的事情,但不太詳細。
我看到你flatMapping爲了將每個值轉換爲Observable。這可以防止映射單個值以取消訂閱整個鏈時遇到的onError。所以當一個運算符拋出時,它將會退訂這個值的內部可觀察鏈。否則,一個錯誤會導致取消訂閱並從主外部可觀察值重新訂閱。
如果你想保持這種行爲,但減少鍋爐板(除了明顯切換到Java8 lambda表達式),我可以想出2個選擇。
首先,重新登錄和重複數據刪除您的數據後重試。如果您的值具有良好的hashcode
和equals
實現,則只有當該集合尚未包含該值時,纔可以使用過濾器追加到有狀態集和onNext。
Observable.<Long> just(1L, 2L, 3L)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}})
.retry(2)
.filter(new Func1<Long, Boolean>() {
Set<Long> state = null;
@Override
public Boolean call(Long a) {
if (state == null)
state = new HashSet<Long>();
if (!state.contains(a)) {
state.add(a);
return true;
}
return false;
}})
.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}});
其次,你可以從何時重新訂閱它離開的地方切換你觀察到的,以簡歷。請注意,當使用緩衝區的操作符(observeOn,merge,flatMap)時,這可能會導致數據丟失問題。這是因爲他們將繼續以與下游消費者脫鉤的方式消費。所以你要確保在重試之前不要緩衝。如果您正在實施支持背壓的可觀察源,還需要考慮其他因素。
// Should resume right where it left off
resumableObservable.map(...).retry(2).observeOn()
// Don't do this. ObserveOn will buffer values and resume will lose data.
resumableObservable.map(...).observeOn().retry(2)
// Also bad if running async observables. Merging buffers so this could have data loss.
Observable.merge(resumableObservable.map(...)).retry(2)
如果任何一項失敗後2個重試,流斷開(沒有更多的項目被處理)。我想要一個乾淨的方式來返回像Finagle的Try這樣的異常和結果,所以我可以處理所有異常。
您可以將不可靠的地圖從Long -> Long
更改爲Long -> Tuple<Long, List<Exception>>
。由於這是相當多的泛型,並且很快變得麻煩,我建議使用重試運算符的不同變體,即retryWhen(Func1<Observable<Throwable>, Observable<?>>)
。這裏是一個如何在你的代碼中使用它的例子。
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){
@Override
public Observable<?> call(Observable<? extends Throwable> o) {
final AtomicInteger count = new AtomicInteger();
return o.filter(new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable t) {
return t instanceof RuntimeException || count.getAndIncrement() < 5;
}}).delay(1, TimeUnit.SECONDS, Schedulers.immediate());
}})
使用重試的好處是,您可以在非阻塞樣式的一段時間後輕鬆實現延遲重試。