我想在Observable
中封裝一個查詢ContentProvider
並訂閱ContentProvider
光標以提供連續更新的邏輯。如何管理RxJava中需要的Looper線程
由於可觀察到的IO工作,我需要訂閱它在Schedulers.io()
。問題是,那麼我不能註冊一個ContentObserver
,因爲它需要一個活套準備線程。
什麼是推薦的方式來管理它和封裝它在一個Observable
。
代碼來說明:
public Observable<Integer> unreadCountObservable() {
return Observable.create(subscriber -> {
new UnreadCountObservable(subscriber);
});
}
private class UnreadCountObservable {
private Subscriber subscriber;
public UnreadCountObservable(Subscriber subscriber) {
this.subscriber = subscriber;
Cursor cursor = queryUnread(subscriber);
cursor.registerContentObserver(observer);
subscriber.add(Subscriptions.create(() -> {
cursor.unregisterContentObserver(observer);
cursor.close();
}));
}
@NonNull
private Cursor queryUnread(Subscriber subscriber) {
Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
if(cursor.moveToNext()) {
Integer count = cursor.getInt(0);
subscriber.onNext(count);
} else {
subscriber.onNext(0);
}
return cursor;
}
private ContentObserver observer = new ContentObserver(new Handler()) {
@Override
public boolean deliverSelfNotifications() {
return false;
}
@Override
public void onChange(boolean selfChange) {
Timber.d("New sms data changed");
queryUnread(subscriber);
}
};
}
注1與上面的代碼的問題是,它不能與.subscribeOn(Schedulers.io()
被稱爲由於registerObserver,如果它被稱爲它,然後mainThread查詢同時在它們上面運行)
注:封裝在一個單一的Observable
是一個關鍵要求,這個問題
我現在最好的想法是爲活動創建一個HandlerThread,使用Observable並使用該線程中的活套。但是想知道是否有更好的選擇,並且如果製作通用調度程序(例如looperIoScheduler())有意義,可能會導致問題。
它不起作用。 SubscribeOn在訂閱鏈中上升,無論它是鏈條的第一個還是最後一個句子都沒關係。這種方式推遲運行io線程和UnreadCountObservable構造函數也運行它。 – lujop