我想使用rxjava來構建一個示例。該示例應編排一個ReactiveWareService和一個ReactiveReviewService,以重新調用WareAndReview組合。如何等待異步Observable來完成
ReactiveWareService
public Observable<Ware> findWares() {
return Observable.from(wareService.findWares());
}
ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!
public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
try {
List<Review> reviews = reviewService.findReviewsByItem(item);
reviews.forEach(observer::onNext);
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
}));
}
public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();
wareService.findWares()
.map(WareAndReview::new)
.subscribe(wr -> {
wareAndReviews.add(wr);
//Async!!!!
reviewService.findReviewsByItem(wr.getWare().getItem())
.subscribe(wr::addReview,
throwable -> System.out.println("Error while trying to find reviews for " + wr)
);
}
);
//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion!
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}
return wareAndReviews;
}
鑑於我不想回可觀察到的,我怎麼能等待異步觀察,(findReviewsByItem)來完成?