2017-02-16 48 views
0

在我的Android項目上,我很依賴RxJava2,SqlBrite(與RxJavaInterop)和SqlDelight查詢RxJava2 Db到另一個主題

我得到了一個應該無限期運行的rx流(直到我的服務停止),並且我有一個.flatMapFunction<String, ObservableSource<Action>>

意義,這flatMap包含Subject<Action>,將獲得String actionId,(對於這個問題無關)做一些處理這些actionId,並根據條件應該查詢該Action對象的數據庫,並將其分配給subject

我的第一種方法是直接做查詢:

Cursor c = db.query(...); 
if(c.moveFirst()) { 
    Action a = Action.SELECT_ALL_MAPPER.map(c); 
    subject.onNext(selectAll); 
} 

但這塊正在運行的線程和我寧願這觸發其自己的流應該做到以下幾點:

  • 查詢(應該返回0或1項)
  • ,如果有一個值:地圖Action對象和值推至subject
  • ,如果沒有相應的價值:終止/處置。
  • subject無法接收終止或錯誤。它必須爲未來的事件而活着。

我目前的做法是下面的代碼:

RxJavaInterop.toV2Observable(db.createQuery(
    Action.TABLE_NAME, 
    Action.FACTORY.Select_by_id(actionId).statement) 
    .mapToOne(new Func1<Cursor, Action>() { 
     @Override public Action call(Cursor cursor) { 
      return Action.SELECT_ALL_MAPPER.map(cursor); 
     } 
    })) 
    .take(1) 
    .subscribe(new Consumer<Action>() { 
     @Override public void accept(Action action) throws Exception { 
      subject.onNext(action); 
     } 
    }); 

雖然這似乎做的第一印象的伎倆,我看到它的幾個錯誤:

  • 我可以不處置它。即使我得到對Disposable對象的引用,我也不能從Consumer<Action>內部調用它,因爲它「可能沒有被初始化」(我理解它的原因,沒關係)。
  • 如果沒有給定ID的動作,那麼observable將永遠停留在那裏,直到VM被殺死。

所以問題:

我怎麼能這樣做?

回答

1

我寧願觸發這個對自己流

看看RxAndroid。這可能是這樣的:

yourRxStream 
    .flatMap(*db request here*) 
    .subscribeOn(Schedulers.io()) 
    .subcribe(subject); 

subject不能接收終止或錯誤。它必須在未來事件中保持活力。

開關與Relay主題:

主題是彌合非的Rx API之間的差距非常有用。但是,它們有損於有狀態:當它們收到onCompleteonError時,它們不再可用於移動數據。這是可觀察到的合同,有時候是期望的行爲。最不 次。

繼電器只是沒有上述屬性的主題。他們使用 可以輕鬆地將非Rx API連接到Rx,並且不用擔心意外觸發終端狀態。


最後比可以輸出0或1項的請求時,使用Maybe

+0

接力是一個好主意,我會檢查它。但0或1不幸的是它不能解決您的建議。 RxJava2不接受'null',它直接拋出NPE。此外,可觀察的DB請求由sqlbrite庫創建,該庫在打開/更新之前保持打開/更新狀態。所以我需要處置或者可能超時關閉它,因爲我只需要這個1-off事件,然後等待下一個將作爲新查詢的「actionId」。感謝你的努力。 – Budius

+0

因爲你不能發送null,所以我正在考慮編寫一個基本的monad來將它的項目或者它的缺席包裝到一個類中,這可能被稱爲Maybe。然後我想起了可能已經作爲可觀察的選擇存在。既然它是完美的搭配,我更新了我的答案。 –