2016-09-22 40 views
7

背景RxJava多線程與境界 - 不正確的線程

我用我的應用程序內的境界境界的訪問。當數據加載後,它會經歷強烈的處理,因此處理髮生在後臺線程上。

正在使用的編碼模式是工作單元模式,Realm只存在於DataManager下的存儲庫中。這裏的想法是每個存儲庫可以有不同的數據庫/文件存儲解決方案。

我已經試過

下面是一些類似的代碼,我在我的FooRespository類的例子。

這裏的想法是獲得Realm的一個實例,用於查詢感興趣對象的領域,返回它們並關閉領域實例。請注意,這是同步的,並在最後將對象從Realm複製到非託管狀態。

public Observable<List<Foo>> getFoosById(List<String> fooIds) { 

    Realm realm = Realm.getInstance(fooRealmConfiguration); 

    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); 

    for(String id : fooIds) { 

     findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id); 
     findFoosByIdQuery.or(); 
    } 

    return findFoosByIdQuery 
      .findAll() 
      .asObservable() 
      .doOnUnsubscribe(realm::close) 
      .filter(RealmResults::isLoaded) 
      .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos)))); 
} 

該代碼後面結合經由RxJava使用與重處理代碼:

dataManager.getFoosById(foo) 
      .flatMap(this::processtheFoosInALongRunningProcess) 
      .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc 
      .subscribe(tileChannelSubscriber); 

閱讀該文檔之後,我相信的是,在上述應該工作,因爲它不是異步的,因此不需要循環線程。我在同一個線程中獲得領域的實例,因此它不在線程之間傳遞,也不是對象。

當上面的執行,我得到不正確的線程

境界接入問題

。領域對象只能在創建它們的線程上訪問 。

這看起來不對。我唯一能想到的就是Realm實例池正在讓我使用主線程從另一個進程創建的現有實例。

+0

它會工作,如果你試圖做一些像'Observable.flatMap {dataManager.getFoosById(foo)''? – wint

+0

你的意思是平面圖我張貼在內部的整個鏈或只是第一部分? –

+0

我不是專家,但我認爲你需要從'Schedulers.io'的相同線程獲取Realm對象嗎?大概是這樣 '''Observable.flatMap(dataManager.getFoosById(富)) .flatMap(這:: processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io())//可能是Schedulers.computation()等 .subscribe (tileChannelSubscriber);''' 雖然 – wint

回答

2

凱所以

return findFoosByIdQuery 
     .findAll() 
     .asObservable() 

這發生在UI線程,因爲這是你從最初的

.subscribeOn(Schedulers.io()) 
叫它

Aaaaand然後你在Schedulers.io()上修補它們。

不,這不是一回事!

雖然我不喜歡複製的從零拷貝數據庫的方法,您目前的做法是百病之因realmResults.asObservable()濫用的問題,所以這裏是爲您的代碼應該是什麼攪局:

public Observable<List<Foo>> getFoosById(List<String> fooIds) { 
    return Observable.defer(() -> { 
     try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works 
      RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); 
      for(String id : fooIds) { 
       findFoosByIdQuery.equalTo(FooFields.ID, id); 
       findFoosByIdQuery.or(); // please guarantee this works? 
      } 
      RealmResults<Foo> results = findFoosByIdQuery.findAll(); 
      return Observable.just(realm.copyFromRealm(results)); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 
+0

順便說一句,值得注意的是另一個解決方案不起作用,因爲asObservable()會在非循環後臺線程上崩潰。只是在說'。 – EpicPandaForce

+0

謝謝你。在我的思考中,我並不孤單,RealmResults的asObservable()的真正意義是什麼? –

+1

在我的文章中有一個例子https://medium.com/@Zhuinden/how-to-use-realm-for-android-like-a-champ-and-how-to-tell-if-youre-這樣做,它-錯ac4f66b7f149#。當你到達那裏時,你會看到它(簡短的回答是,在looper線程(即UI線程)上,它會自動添加一個更改偵聽器,通知您並在取消訂閱時將其刪除) – EpicPandaForce

2

請注意,您正在所有RxJava處理管道之外創建實例。因此,在主線程(或任何線程上,當您撥打getFoosById()時,

僅僅因爲該方法返回一個Observable並不意味着它運行在另一個線程上只有最後創建的Observable的處理管道您getFoosById()方法的聲明在正確的線程(filter(),在flatMap()和所有調用者進行的處理)。

因此,你必須確保getFoosById()呼叫通過Schedulers.io()使用的線程上已經完成運行。實現這一

一種方式是通過使用Observable.defer()

Observable.defer(() -> dataManager.getFoosById(foo)) 
      .flatMap(this::processtheFoosInALongRunningProcess) 
      .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc 
      .subscribe(tileChannelSubscriber); 
+1

Wolfram,很高興你看到了這一點。我會在早上嘗試。這段代碼可以很容易地添加到位於調用者和存儲庫之間的DataManager中。 –