2017-08-27 81 views
0

我正在升級到rxjava2,我們有代碼來輪詢來自服務器的數據,當存在網絡問題時,代碼處理將延遲重試。但是,不知何故,當我試圖遷移到rxjava2時,代碼停止工作。 這裏是Rxjava1的代碼,並其正常使用,基本上遵循了這一http://blog.danlew.net/2015/03/02/dont-break-the-chain/https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a重試/重複,延遲在RxJava2中不起作用

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { 
       @Override 
       public Observable<?> call(final Observable<? extends Throwable> observable) { 
        // wrap a flatmap here so that i can check the exception type 
        return observable.flatMap(new Func1<Throwable, Observable<?>>() { 
         @Override 
         public Observable<?> call(Throwable throwable) { 
          mThrowable = throwable; 
          if (throwable instanceof IOException) { 
           return observable.compose(timerWithRetries()); 
          } else { 
           // for other errors, call onError and exit 
           return Observable.error(throwable); 
          } 
         } 
        }); 
       } 
      }) 



private <T> Observable.Transformer<T, Long> timerWithRetries() { 
    return new Observable.Transformer<T, Long>() { 

     @Override 
     public Observable<Long> call(Observable<T> observable) { 
      return observable 
        .zipWith(Observable.range(COUNTER_START, MAX_RETRIES + 1), 
          new Func2<T, Integer, Integer>() { 
           @Override 
           public Integer call(T t, Integer repeatAttempt) { 
            return repeatAttempt; 
           } 
          }) 
        .flatMap(new Func1<Integer, Observable<Long>>() { 
         @Override 
         public Observable<Long> call(Integer repeatAttempt) { 
          if (repeatAttempt == MAX_RETRIES + 1) { 
           if (mThrowable instanceof IOException) { 
            // Custom Exception 
            throw new Exception(); 
           } 
          } 
          // increase the waiting time 
          return Observable.timer(repeatAttempt * mDelaySeconds, TimeUnit.SECONDS); 
         } 
        }); 
     } 
    }; 
} 

我想包裝與flatmap錯誤,這樣我可以檢查異常類型,當它達到最大重試次數,我可以將我的自定義異常傳遞給onError。

但是,當使用Rxjava2時,timerWithRetries()方法停止工作,調用此方法,但不會執行.zipWith()及其平面圖。

但是它在包裝錯誤時沒有使用flatmap,這很奇怪。類似於

.retryWhen(error -> error.compose(timerWithRetries())) 

非常感謝您的任何建議!

回答

0

我終於設法找到解決方法。使用delay()而不是使用zipWith()和flatmap()。

AtomicInteger retryCounter = new AtomicInteger(0); 

.retryWhen(error -> error.flatmap(e -> { 
    if (e instanceof HttpException) { 
     // code that deals with specific exception 
     int retries = retryCounter.increaseAndGet(); 
     if (retries < MAX_RETRIES) { 
      // Key point here, uses .delay() 
      return Observable.just(new Object()).delay(delaySeconds, SECOND); 
     } 
    } 
})) 
0

1.x retryWhen使用了一個BehaviorSubject,它們持有最後一個Throwable並在有新用戶時重播。這主要是由於它試圖支持大多數retryrepeat運營商的「奇怪」實施。

2.x使用PublishSubject並且通常只訂閱一次(而不是再次編寫)。只有發生故障時的觀察員纔會收到錯誤值,但不會收到錯誤發生後正確的值。

實際上,observable.compose(timerWithRetries());並不完全正確,因爲您不斷將觀察者添加到主題而不清除以前的主題。

最後一種情況是因爲您使用作爲對原始錯誤的響應發出的counting-flatMapped處理程序構建主要錯誤源。

+0

那麼有沒有辦法在retry時得到throwable而不包裝flatmap?對於observable.compose,有沒有辦法清理以前的可觀察對象?謝謝 – Cheng

+0

如果您想將錯誤轉化爲日益延遲的重試信號,則無法避免使用'flatMap'。對於撰寫,你需要一些在發出重試信號後自行取消的東西;如果你不爲你的情況使用'compose',這會更簡單。 – akarnokd

+0

找到了解決方法,使用delay()代替zipwith和flatmap – Cheng