2014-06-18 229 views
7

我想使用rxjava來構建一個示例。該示例應編排一個ReactiveWareService和一個ReactiveReviewService,以重新調用WareAndReview組合。如何等待異步Observable來完成

ReactiveWareService 
     public Observable<Ware> findWares() { 
     return Observable.from(wareService.findWares()); 
    } 

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency! 

public Observable<Review> findReviewsByItem(final String item) { 
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> { 
    try { 
     List<Review> reviews = reviewService.findReviewsByItem(item); 
     reviews.forEach(observer::onNext); 
     observer.onCompleted(); 
    } catch (Exception e) { 
     observer.onError(e); 
    } 
})); 
} 

public List<WareAndReview> findWaresWithReviews() throws RuntimeException { 
final List<WareAndReview> wareAndReviews = new ArrayList<>(); 

wareService.findWares() 
    .map(WareAndReview::new) 
.subscribe(wr -> { 
     wareAndReviews.add(wr); 
     //Async!!!! 
     reviewService.findReviewsByItem(wr.getWare().getItem()) 
      .subscribe(wr::addReview, 
       throwable -> System.out.println("Error while trying to find reviews for " + wr) 
      ); 
    } 
); 

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion! 
try { 
    Thread.sleep(3000); 
} catch (InterruptedException e) {} 

return wareAndReviews; 
} 

鑑於我不想回可觀察到的,我怎麼能等待異步觀察,(findReviewsByItem)來完成?

回答

-3

另一種方法是在開始之前聲明一個CountdownLatch。然後在onCompleted()中的那個latch上調用countDown()。然後,您可以用該閂鎖上的await()替換Thread.sleep()。

public class Example { 

    Scheduler scheduler = Schedulers.from(executor); 

    public Observable<Review> findReviewsByItem(final String item) { 
     return Observable.just(item) 
       .subscribeOn(scheduler) 
       .flatMapIterable(reviewService::findReviewsByItem); 
    } 
    public List<WareAndReview> findWaresWithReviews() { 
     return wareService 
       .findWares() 
       .map(WareAndReview::new) 
       .flatMap(wr -> { 
        return reviewService 
          .findReviewsByItem(wr.getWare().getItem()) 
          .doOnNext(wr::addReview) 
          .lastOrDefault(null) 
          .map(v -> wr); 
       }) 
       .toList() 
       .toBlocking() 
       .first(); 
    } 
} 

每當你想組合服務這樣的,想到的第一flatMap

12

你大部分的例子可以用標準RxJava運營商一起工作的順利進行重寫。如果真的有必要,你不需要爲每個子Observable進行阻塞,但只需要在最後使用toBlocking()