2017-06-22 38 views
1

假設我們有一組可能或可能不能處理給定文件的片狀(有時是失敗的)分析器,即給定的分析器以某種概率成功或總是失敗(p=0)。是否有可能使用RxJava讓這組解析器訂閱一個傳入文件流並用'race'解析文件?如何使用RxJava 2編排重試的比賽?

鑑於解析器可能最初失敗,但仍然能夠解析文件,有必要讓它們重試一些退避策略。鑑於沒有解析器能夠處理給定文件也是可能的,重試計數應該被限制。

實現指數退避是比較容易使用retryWhen像這樣的東西(source)來實現:

source.retryWhen(errors -> 
       errors.zipWith(Observable.range(1, 3), (n, i) -> i) 
         .flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS)) 
); 

然而,建立一個平行的比賽是我想不出該怎麼辦。看起來amb運營商是我們在這裏想要的,但將它應用於任意數量的流似乎需要使用blockingIterable,這(我認爲)在阻止比賽時擊敗了比賽的目的。我一直無法在互聯網上找到與amb這個用例有關的任何有用信息。

我的企圖迄今爲止像這樣的事情:

Set<Parser> parserSet = new HashSet<>(); 
parserSet.add(new Parser(..., ..., ...)); 
// Add more parsers 
int numParsers = parserSet.size(); 

Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat(); 

fileSource 
    .flatMap(f -> parsers.take(numParsers) 
         .map(p -> p.parse(f)) 
         .retryWhen(/* snippet from above */) 
         .onErrorReturn(/* some error value */) 
      ).take(1) 

Flowable推出.parallel()運營商最近剛剛加入ParallelFailureHandlingsee this pr),其具有RETRY方法,但我似乎無法得到流動人員在其中一人返回後停止重試。

這個問題可以用RxJava解決嗎?

+0

它並沒有說它需要特別的阻止Iterable,你爲什麼認爲它需要一個? –

+0

'Flowable.amb()'需要'Iterable <?擴展發佈者>,我沒有看到一種方法,通過'blockingIterable'調用讓它們進入這種形式。我也嘗試使用'.reduce(Flowable :: amb)',但我無法讓它工作。 –

+0

'Observable.amb'或'ambWith'(我自己沒有太多的經驗,所以請耐心等待我提出愚蠢的問題)? –

回答

1

作出合理的假設,你的解析器是同步的,像

Set<Parser> parserSet = new HashSet<>(); 
parserSet.add(new Parser(..., ..., ...)); 
// Add more parsers 
int numParsers = parserSet.size(); 

ArrayList<Flowable<T>> parserObservableList = new ArrayList<>(); 

for (Parser p: parserSet) { 
    parserObservableList.add(Flowable.fromCallable(() -> p.parse(f)) 
            .retryWhen(/* Add your retry logic */) 
            .onErrorReturn(/* some error value */)); 
} 

Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */); 

應該滿足你的要求。