2016-09-12 71 views
2

我想在Observable中封裝一個查詢ContentProvider並訂閱ContentProvider光標以提供連續更新的邏輯。如何管理RxJava中需要的Looper線程

由於可觀察到的IO工作,我需要訂閱它在Schedulers.io()。問題是,那麼我不能註冊一個ContentObserver,因爲它需要一個活套準備線程。

什麼是推薦的方式來管理它和封裝它在一個Observable

代碼來說明:

public Observable<Integer> unreadCountObservable() { 
    return Observable.create(subscriber -> { 
     new UnreadCountObservable(subscriber); 
    }); 
} 

private class UnreadCountObservable { 
    private Subscriber subscriber; 

    public UnreadCountObservable(Subscriber subscriber) { 
     this.subscriber = subscriber; 
     Cursor cursor = queryUnread(subscriber); 
     cursor.registerContentObserver(observer); 
     subscriber.add(Subscriptions.create(() -> { 
      cursor.unregisterContentObserver(observer); 
      cursor.close(); 
     })); 
    } 

    @NonNull 
    private Cursor queryUnread(Subscriber subscriber) { 
     Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null); 
     if(cursor.moveToNext()) { 
      Integer count = cursor.getInt(0); 
      subscriber.onNext(count); 
     } else { 
      subscriber.onNext(0); 
     } 
     return cursor; 
    } 

    private ContentObserver observer = new ContentObserver(new Handler()) { 
     @Override 
     public boolean deliverSelfNotifications() { 
      return false; 
     } 

     @Override 
     public void onChange(boolean selfChange) { 
      Timber.d("New sms data changed"); 
      queryUnread(subscriber); 
     } 
    }; 
} 

注1與上面的代碼的問題是,它不能與.subscribeOn(Schedulers.io()被稱爲由於registerObserver,如果它被稱爲它,然後mainThread查詢同時在它們上面運行)

注:封裝在一個單一的Observable是一個關鍵要求,這個問題

的動機

我現在最好的想法是爲活動創建一個HandlerThread,使用Observable並使用該線程中的活套。但是想知道是否有更好的選擇,並且如果製作通用調度程序(例如looperIoScheduler())有意義,可能會導致問題。

回答

-1

好的,那麼爲什麼這不工作?

public Observable<Integer> unreadCountObservable() { 
    return Observable.defer(() -> Observable.create(subscriber -> { 
     new UnreadCountObservable(subscriber); 
    })) 
    .subscribeOn(Schedulers.io()); 
} 
+1

它不起作用。 SubscribeOn在訂閱鏈中上升,無論它是鏈條的第一個還是最後一個句子都沒關係。這種方式推遲運行io線程和UnreadCountObservable構造函數也運行它。 – lujop

1

Observable鏈中,您可以根據需要隨時更改線索。看看here

函數rx.Observable#observeOn(rx.Scheduler)可以位於鏈中的任何位置。嘗試做這樣的事情(僞代碼):

Observable.just(cursor) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .map((Cursor) -> { 
       cursor.registerContentObserver(observer); 
       return cursor; 
      } 
     }).observeOn(Schedulers.io()); 
subscriber.add(Subscriptions.create(() -> { 
     cursor.unregisterContentObserver(observer); 
     cursor.close(); 
    })); 
+0

謝謝,但是您能否擴展您的示例以包括光標創建並從觀察者發送更新。因爲我認爲完整的代碼不易讀取。 – lujop