假設我們有一組可能或可能不能處理給定文件的片狀(有時是失敗的)分析器,即給定的分析器以某種概率成功或總是失敗(p=0
)。是否有可能使用RxJava讓這組解析器訂閱一個傳入文件流並用'race'解析文件?如何使用RxJava 2編排重試的比賽?
鑑於解析器可能最初失敗,但仍然能夠解析文件,有必要讓它們重試一些退避策略。鑑於沒有解析器能夠處理給定文件也是可能的,重試計數應該被限制。
實現指數退避是比較容易使用retryWhen
像這樣的東西(source)來實現:
source.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);
然而,建立一個平行的比賽是我想不出該怎麼辦。看起來amb
運營商是我們在這裏想要的,但將它應用於任意數量的流似乎需要使用blockingIterable
,這(我認爲)在阻止比賽時擊敗了比賽的目的。我一直無法在互聯網上找到與amb
這個用例有關的任何有用信息。
我的企圖迄今爲止像這樣的事情:
Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();
Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();
fileSource
.flatMap(f -> parsers.take(numParsers)
.map(p -> p.parse(f))
.retryWhen(/* snippet from above */)
.onErrorReturn(/* some error value */)
).take(1)
Flowable
推出.parallel()
運營商最近剛剛加入ParallelFailureHandling
(see this pr),其具有RETRY
方法,但我似乎無法得到流動人員在其中一人返回後停止重試。
這個問題可以用RxJava解決嗎?
它並沒有說它需要特別的阻止Iterable,你爲什麼認爲它需要一個? –
'Flowable.amb()'需要'Iterable <?擴展發佈者 extends T>>,我沒有看到一種方法,通過'blockingIterable'調用讓它們進入這種形式。我也嘗試使用'.reduce(Flowable :: amb)',但我無法讓它工作。 –
'Observable.amb'或'ambWith'(我自己沒有太多的經驗,所以請耐心等待我提出愚蠢的問題)? –