2017-09-14 139 views
0

我目前正在學習如何使用RxJava,並且我想要計算從數據庫中獲取的數據,然後異步計算它們,但要等待它們全部被計算。等待Observable訂閱完成

List<Data> list = getDatasFromDatabase(); 

list.forEach(data -> compute(data)); // How can I make this async using RxJava ? 

// Here all Datas are computed 
return list; 

我怎樣才能使這種阻塞:

Observable.from(list).forEach(data -> compute(data)); 

回答

0

可以使用toBlocking()運算符來創建BlockingObservable。但是,我不清楚您需要的地方或原因?如果compute()是同步方法,那麼您的第一個解決方案將完成您所需的所有工作,因爲它將一次處理每個項目,並在最後一個compute()完成時完成。

相反,如果你想compute()以異步方式完成,那麼你可能會想:

Observable.from(list) 
    .subscribeOn(backgroundThreadScheduler) 
    .flatMap(data -> Observable.fromCallable(d -> compute(d))) 
    .toList() 
    .observeOn(foregroundScheduler) 
    .subscribe(revisedList -> doSomethingWith(revisedList)); 

如果需要doSomethingWith()同步與周圍的代碼來完成,你可以在subscriber()toBlocking()操作。不推薦這一步,因爲最終會阻塞前臺線程。