我在異步消息傳遞環境(vert.x)使用RxJava,所以有一個看起來像這樣的流程:retryWhen與計時器似乎顛覆合併行爲
Observable.defer(() -> getEndpoint())
.mergeWith(getCancellationMessage())
.flatMap(endpoint -> useEndpoint(endpoint))
.retryWhen(obs -> obs.flatMap(error -> {
if (wasCancelled(error)) {
return Observable.error(error);
}
return Observable.timer(/* args */)
}))
.subscribe(result -> useResult(result),
error -> handleError(error)
);
的getCancellationMessage()
實施返回可觀察的流,每當從獨立消息源接收到取消消息時都會發出錯誤。此流不會發出除Observable.error()
以外的任何內容,並且它只會在接收到取消消息時發出錯誤。
如果我瞭解merge
的工作原理,則當getCancellationMessage()
發出錯誤時,應通過onError
終止整個鏈。
但是,我發現如果retryWhen
運營商在收到取消消息時正在等待定時器發出,則錯誤將被忽略,並且retryWhen
循環繼續,就好像從未收到取消。
我可以通過將Observable.timer()
與getCancellationMessage()
函數合併來解決該問題,但我不明白爲什麼我必須首先執行此操作。
這是merge
/retryWhen
的交互期望嗎?
編輯:
下面是什麼樣的事情了getCancellationMessage()
功能正在做的一個例子:
Observable<T> getCancellationMessage() {
if (this.messageStream == null) {
this.messageStream = this.messageConsumer.toObservable()
.flatMap(message -> {
this.messageConsumer.unregister();
if (isCancelMessage(message)) {
return Observable.error(new CancelError());
}
else {
return Observable.error(new FatalError());
}
});
}
return this.messageStream;
}
請注意,我沒有自己的this.messageConsumer
實施 - 這是來自我正在使用的第三方庫(vert.x),所以我不控制該Observable的實現。
據我瞭解,在messageConsumer.toObservable()
方法返回提供的this class實例Observable.create()
的結果,這將調用用戶的onNext
方法,每當一個新的消息has arrived。
對messageConsumer.unregister()
的呼叫會阻止接收到任何進一步的消息。
閱讀文檔,它沒有提及enywhere是'Observable.timer'調用'onError'可言。由於'onError'沒有被定時器調用,子訂閱也不會調用'onError',所以它忽略了異常。老實說,我有點缺乏深入的瞭解與信心,回答這個問題,但你有沒有嘗試過的文檔表明什麼官方在[這裏](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(RX .functions.Func1)) –
我並不真正期望'Observable.timer()'產生一個錯誤,但是當'timer'運行時,'getCancellationMessage'發出錯誤是絕對正確的,而我想知道爲什麼它被忽略。即使在計時器過去之後,就好像'getCancellationMessage'從不發出'Observable.error()'。在'retryWhen'文檔中找不到任何我可以解釋我所看到的內容。 – Hoobajoob
但看在我的網址公佈前,如果從'觀察的retryWith'不會調用圖形'onError',然後導致觀察到繼續循環,這聽起來像您遇到什麼。 –