2017-01-16 76 views
3

我有一個需要觸發事件的Observable的API。RxJava中的指數退避

我想返回Observable,如果檢測到Internet連接,則每defaultDelay秒發出一個值,如果沒有連接,則延遲numberOfFailedAttempts^2次。

我已經嘗試了一堆各種風格,我在被觀察到的retryWhen's只計算一次最大的問題:

Observable 
    .interval(defaultDelay,TimeUnit.MILLISECONDS) 
    .observeOn(Schedulers.io()) 
    .repeatWhen((observable) -> 
     observable.concatMap(repeatObservable -> { 
      if(internetConnectionDetector.isInternetConnected()){ 
       consecutiveRetries = 0; 
       return observable; 
      } else { 
       consecutiveRetries++; 
       int backoffDelay = (int)Math.pow(consecutiveRetries,2); 
       return observable.delay(backoffDelay, TimeUnit.SECONDS); 
       } 
     }).onBackpressureDrop()) 
    .onBackpressureDrop(); 

有沒有辦法做什麼,我試圖做的?我發現了一個相關的問題(目前找不到它),但採取的方法似乎不適用於動態值。

回答

3

在代碼中有兩處錯誤:

  1. 爲了重複一些可觀察序列,該序列必須是有限的。即而不是像interval那樣,您最好使用像justfromCallable之類的東西,就像我在下面的示例中所做的那樣。
  2. repeatWhen的內部函數您需要返回新的延遲觀察源,所以而不是observable.delay()你必須返回Observable.timer()

工作代碼:

public void testRepeat() throws InterruptedException { 
    logger.info("test start"); 

    int DEFAULT_DELAY = 100; // ms 
    int ADDITIONAL_DELAY = 100; // ms 
    AtomicInteger generator = new AtomicInteger(0); 
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive 

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet) 
      .repeatWhen(counts -> { 
       AtomicInteger retryCounter = new AtomicInteger(0); 
       return counts.flatMap(c -> { 
        int retry = 0; 
        if (connectionAlive.get()) { 
         retryCounter.set(0); // reset counter 
        } else { 
         retry = retryCounter.incrementAndGet(); 
        } 
        int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2); 
        logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay); 
        return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS); 
       }); 
      }) 
      .subscribe(v -> logger.info("got {}", v)); 

    Thread.sleep(220); 
    logger.info("connection dropped"); 
    connectionAlive.set(false); 
    Thread.sleep(2000); 
    logger.info("connection is back alive"); 
    connectionAlive.set(true); 
    Thread.sleep(2000); 
    subscription.dispose(); 
    logger.info("test complete"); 
} 

查看關於repeatWhenhere詳細的文章。

+0

問題中的示例可能來自我嘗試的中間,因爲它似乎混合了我使用的兩種方法(一個定時器+重試,一個間隔+延遲訂閱),問題實際上來自該文章,該文章說應該再次使用可重試/重複觀察的輸入。不使用泄露訂閱的可觀察原因問題? –

+1

@AssortedTrailmix這是關於第一級輸入,而不是關於內部'flatMap'。請參閱該文章中的最後一個示例,瞭解非常相似的模式 –

+0

哦,我明白了,對不起,我錯過了'count'是什麼得到flatMap'ed –

1

您可以使用retryWhen運算符來配置沒有連接時的延遲。如何定期發出物品是一個單獨的主題(查找intervaltimer運營商)。如果你無法弄清楚,請另外打個問題。

我在我的Github上有一個廣泛的例子,但我會給你這裏的要點。

RetryWithDelay retryWithDelay = RetryWithDelay.builder() 
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT) 
    .build() 

Single.fromCallable(() -> { 
    ... 
}).retryWhen(retryWithDelay) 
.subscribe(j -> { 
    ... 
}) 

RetryWithDelay定義如下。我使用了RxJava 2.x,所以如果你使用1.x,簽名應該是Func1<Observable<? extends Throwable>, Observable<Object>>

public class RetryWithDelay implements 
     Function<Flowable<? extends Throwable>, Publisher<Object>> { 
    ... 
} 

RetryWithDelay class。

RetryStrategy enum。

這使我可以根據RetryDelayStrategy配置各種類型的超時,常量,線性,指數。對於您的使用案例,您應選擇CONSTANT_DELAY_TIMES_RETRY_COUNT延遲策略,並在編制RetryWithDelay時致電retryDelaySeconds(2)

retryWhen是一個複雜的,甚至可能是越野車的操作員。你可以在網上找到的大多數例子都使用range操作符,如果沒有重試就會失敗。有關詳細信息,請參閱我的回答here

2

我一直髮現retryWhen有些低級別,所以對於指數回退,我使用一個構建器(如Abhijit),它是單元測試的,可用於RxJava 1.x的rxjava-extras。我建議使用加蓋版本,以便延遲的指數增加不會超出您定義的最大值。

這是你如何使用它:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
     delay, maxDelay, TimeUNIT.SECONDS) 
    .build()); 

我不同意retryWhen是馬車,但如果你發現它RxJava的bug報告。 Bug快速修復!

你需要 rxjava-額外 0.8.0.6或更高版本是Maven的中央:

<dependency> 
    <groupId>com.github.davidmoten</groupId> 
    <artifactId>rxjava-extras</artifactId> 
    <version>0.8.0.6</version> 
</dependency> 

讓我知道如果你需要的RxJava 2.x版。從0.1.4開始,rxjava2-extras中提供了相同的功能。

+0

我知道我曾經見過這個地方!我不想在這裏重新發明輪子,所以我可能會去這個,看看實施,看看我應該如何去了解它 –

+0

今天我注意到,我忘了實現最大值後退,但它看起來像使用版本0.8.0.6時那樣的方法簽名不存在我 –

+0

我在前一天很匆忙,所以我告訴自己我會回到我需要的那段代碼,看起來這個解決方案並沒有實現,沒有我期望的行爲,這就是重試應該在成功調用後重置(這很有意義,因爲這需要「帶外」通信)。我認爲下面概述的重複方法是我目前需要的方法,這種方法似乎針對「重試直到它工作」和「總是重試」這種更常見的情況進行了優化,如果它不起作用,延遲時間會更長「 –