你好RxJava主人,RxJava更改線程後CONCAT地圖
在我目前的Android項目,我遇到了一些問題僵局而與RxJava和SQLite播放。我的問題是:
- 我在一個線程
- 調用Web服務啓動一個事務,並保存一些東西在數據庫
- CONCAT映射另一個觀察的功能
- 嘗試寫其他的東西在數據庫上--->獲得了僵局
這裏是我的代碼:
//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
Observable.just(null)
/* Go to known thread to open db transaction */
.observeOn(mScheduler)
.doOnNext(o -> myStore.startTransaction())
/* Do some treatments that change thread */
.someWebServiceCallWithRetrofit()
/* Return to known thread to save items in db */
.observeOn(mScheduler)
.flatMap(items -> saveItems(items))
.subscribe();
public Observable<Node> saveItems(List<Item> items) {
Observable.from(items)
.doOnNext(item -> myStore.saveItem(item)) //write into the database OK
.concatMap(tab -> saveSubItems(item));
}
public Observable<Node> saveSubItems(Item item) {
return Observable.from(item.getSubItems())
.doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}
爲什麼突然所有的RxJava正在改變線程?即使我指定我希望他在我自己的調度程序中觀察。我通過在saveSubItem之前添加另一個observeOn來做了一個骯髒的修復,但這可能不是正確的解決方案。
我知道,當你調用與改造web服務,響應被轉發到一個新的線程(這就是爲什麼我創建了自己的調度找回在線程我開始我的SQL事務)。但是,我真的不明白RxJava如何管理線程。
非常感謝您的幫助。