2017-01-31 26 views
0

我在異步消息傳遞環境(vert.x)使用​​RxJava,所以有一個看起來像這樣的流程:retryWhen與計時器似乎顛覆合併行爲

Observable.defer(() -> getEndpoint()) 
      .mergeWith(getCancellationMessage()) 
      .flatMap(endpoint -> useEndpoint(endpoint)) 
      .retryWhen(obs -> obs.flatMap(error -> { 
       if (wasCancelled(error)) { 
        return Observable.error(error); 
       } 
       return Observable.timer(/* args */) 
      })) 
      .subscribe(result -> useResult(result), 
         error -> handleError(error) 
     ); 

getCancellationMessage()實施返回可觀察的流,每當從獨立消息源接收到取消消息時都會發出錯誤。此流不會發出除Observable.error()以外的任何內容,並且它只會在接收到取消消息時發出錯誤。

如果我瞭解merge的工作原理,則當getCancellationMessage()發出錯誤時,應通過onError終止整個鏈。

但是,我發現如果retryWhen運營商在收到取消消息時正在等待定時器發出,則錯誤將被忽略,並且retryWhen循環繼續,就好像從未收到取消。

我可以通過將Observable.timer()getCancellationMessage()函數合併來解決該問題,但我不明白爲什麼我必須首先執行此操作。

這是merge/retryWhen的交互期望嗎?

編輯:

下面是什麼樣的事情了getCancellationMessage()功能正在做的一個例子:

Observable<T> getCancellationMessage() { 
    if (this.messageStream == null) { 
     this.messageStream = this.messageConsumer.toObservable() 
          .flatMap(message -> { 
           this.messageConsumer.unregister(); 
           if (isCancelMessage(message)) { 
            return Observable.error(new CancelError()); 
           } 
           else { 
            return Observable.error(new FatalError()); 
           } 
          }); 
    } 
    return this.messageStream; 
} 

請注意,我沒有自己的this.messageConsumer實施 - 這是來自我正在使用的第三方庫(vert.x),所以我不控制該Observable的實現。

據我瞭解,在messageConsumer.toObservable()方法返回提供的this class實例Observable.create()的結果,這將調用用戶的onNext方法,每當一個新的消息has arrived

messageConsumer.unregister()的呼叫會阻止接收到任何進一步的消息。

+1

閱讀文檔,它沒有提及enywhere是'Observable.timer'調用'onError'可言。由於'onError'沒有被定時器調用,子訂閱也不會調用'onError',所以它忽略了異常。老實說,我有點缺乏深入的瞭解與信心,回答這個問題,但你有沒有嘗試過的文檔表明什麼官方在[這裏](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(RX .functions.Func1)) –

+0

我並不真正期望'Observable.timer()'產生一個錯誤,但是當'timer'運行時,'getCancellationMessage'發出錯誤是絕對正確的,而我想知道爲什麼它被忽略。即使在計時器過去之後,就好像'getCancellationMessage'從不發出'Observable.error()'。在'retryWhen'文檔中找不到任何我可以解釋我所看到的內容。 – Hoobajoob

+1

但看在我的網址公佈前,如果從'觀察的retryWith'不會調用圖形'onError',然後導致觀察到繼續循環,這聽起來像您遇到什麼。 –

回答

2

但是,我發現如果retryWhen運算符在接收到取消消息時等待定時器發出,則錯誤將被忽略,並且retryWhen循環繼續,就好像從未收到取消。

操作員retryWhen接通上游Throwable成通過你爲了得到一個值響應於重試上游或結束流提供的序列的值,並且將其路由,從而

Observable.error(new IOException()) 
.retryWhen((Observable<Throwable> error) -> error) 
.subscribe(); 

即將重試無限期地因爲內部error現在被認爲是一個價值,而不是一個例外。

retryWhen本身並不知道該error值應該認爲是一個不應該被重試,那是你的內部流動的工作:

Observable.defer(() -> getEndpoint()) 
    .mergeWith(getCancellationMessage()) 
    .flatMap(endpoint -> useEndpoint(endpoint)) 
    .retryWhen(obs -> obs 
      .takeWhile(error -> !(error instanceof CancellationException)) // <------- 
      .flatMap(error -> Observable.timer(/* args */)) 
) 
    .subscribe(result -> useResult(result), 
       error -> handleError(error) 
); 

在這裏,我們只讓錯誤通過,如果它不是CancellationException類型(您可以用您的錯誤類型替換它)。這將完成序列。

如果您想要的順序來結束一個錯誤,而不是,我們需要改變flatMap邏輯來代替:

.retryWhen(obs -> obs 
     .flatMap(error -> { 
      if (error instanceof CancellationException) { 
       return Observable.error(error); 
      } 
      return Observable.timer(/* args */); 
     }) 
) 

注意,在返回flatMapObservable.empty()不會結束序列,因爲它只是表明一個源合併是空的,但可能還有其他內部來源。特別要retryWhen,一個empty()將掛起序列無限期因爲不會有任何信號,以指示重試或結束序列。

編輯:

根據您的字眼,我認爲getCancellationMessage()是一個熱門的觀測。爲了接收他們的事件或錯誤,必須觀察到熱觀測值。當retryWhen運營商在其重試寬限期由於timer(),沒有什麼訂閱到最mergeWithgetCancellationMessage(),因此它不能停止在該點的定時器。

你要訂閱保持它,而timer執行停止它的時候了:

Observable<Object> cancel = getCancellationMessage(); 

Observable.defer(() -> getEndpoint()) 
    .mergeWith(cancel) 
    .flatMap(endpoint -> useEndpoint(endpoint)) 
    .retryWhen(obs -> obs 
     .flatMap(error -> { 
      if (error instanceof CancellationException) { 
       return Observable.error(error); 
      } 
      return Observable.timer(/* args */).takeUntil(cancel); 
     }) 
) 
    .subscribe(result -> useResult(result), 
       error -> handleError(error) 
); 

在這種情況下,如果cancel火災而計時器執行時,retryWhen會停止計時器,並終止立即取消錯誤。

使用takeUntil是一種選擇,因爲您發現,mergeWith (cancel)再次適用。

+0

感謝您的幫助解釋。我將要應用您的建議並驗證他們是否解決了此問題。 – Hoobajoob

+0

不幸的是,這似乎沒有改變行爲。如果'retryWhen'遇到導致它返回'Observable.timer()'的錯誤,則在該點之後但在定時器到期之前產生的任何錯誤都將被忽略。 'retryWhen'甚至不會被後面的錯誤調用。 – Hoobajoob

+0

我相應地更新了示例代碼。仍然困惑。 – Hoobajoob