2014-11-16 41 views
3

我想重複發出HTTP請求並對結果進行操作。我從public Observable<NewsItem> fetchItems(NewsFeed feed)開始。一個請求獲得一些新聞項目,但我決定將其壓扁。如何使用Java RX Observable鏈接異步操作?

這個想法是使用Observable.interval()多次提出請求,然後將得到的Observables合併爲一個。

 Observable 
      .interval(timePerItem, TimeUnit.MILLISECONDS) 
      .map(i -> feed) 
      .map(feed -> fetchItems(feed)) 
      .subscribe(result -> System.out.println(result)); 

但結果是Observable<Observable<NewsItem>>Observable<NewsItem>。如何成功?

我找到了marge()運算符(RX-Java doc: Marge)。但它似乎不適合用例。

在之前的版本中,我使用了CompletableFuture<List<NewsItem>> fetchNewsItems(),但是我無法將它放入Observable鏈中。

回答

3

不知道我是否理解這個問題,但是不是您只是在尋找flatMap

Observable 
    .interval(timePerItem, TimeUnit.MILLISECONDS) 
    .flatMap(i -> fetchItems(feed)) 
    .subscribe(result -> System.out.println(result)); 
+0

看起來像這就是我正在尋找。從https://github.com/ReactiveX/RxJava/wiki學習rx-java很難。在引言中沒有提到flatMap,它在https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable中描述的方式很奇怪。謝謝! – atok

+0

http://reactivex.io/tutorials.html還有很多優秀教程的鏈接。如果您發現了一個尚未提及的優秀教程,或者您自己編寫了一個優秀教程,請在https://github.com/ReactiveX/reactivex.github.io上提出拉取請求,以便鏈接包含在該頁面中; - ) –