2016-11-07 16 views
0

我正在學習RxJava,所以請溫柔。我看過教程,完成了閱讀,然後搜索到了,但是,我仍然在改變我的AsyncTaskLoader時遇到了一些問題。出於某種原因,我無法找到一種操作者的模式來完成我的任務(儘管我認爲這是常見的)。我想要做的是以下內容:返回我的片段可以訂閱的Observable。觀察者應該在訂閱時執行以下操作:RxJava模式返回冷結果,做更多的工作,然後返回熱門結果

1)通過執行2個查詢,運行一些邏輯並返回結果,從本地數據庫獲取數據;
2)從API獲取數據;
3)將新的API數據與數據庫同步;
4)重複第一步並返回結果;

到目前爲止,我已經將我的數據庫調用和API調用轉換爲可觀察對象。我試圖理解我如何能夠散發出冷酷的結果並繼續進行連鎖。我可以分別保留這兩個操作,並使用相同的訂閱者來訂閱這兩個操作?但我不確定如果我的新的裝載器替換類返回一個可觀察對象將會如何工作......另外,我並不需要處理第二個可觀察對象的結果 - 我只需要第一個觀察對象第二個完成。

到目前爲止,我有以下幾點:

public Observable<StuffFetchResult> getColdStuff() { 
    return Observable.zip(mDataSource.listStuff(), mDataSource.listOtherStuff(), 
      (stuff, moreStuff) -> { 
       List<Stuff> mergedList = new ArrayList<>(); 
       // do some merging stuff 
       return new StuffFetchResult(mergedList); 
      }).subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

假設我也有getHotStuff(),會做的API調用,並與數據庫中的同步,如果這是正確的做法,並返回相同的Observable。但是,我堅持下一步 - 如何在不添加其他用戶的情況下重新啓動第一個可觀察節點以重播hotStuff一次?

編輯:

我已經取得了一些進展,我想所有我現在需要的是加入了這一切。我有兩種方法:

1)getColdStuff()是非常以上
2)getHotStuff()描述會做打電話的API,與數據庫同步,並返回一個可觀察。想法是在getHotStuff()完成後再次調用getColdStuff()以刷新UI,因此可以忽略從getHotStuff()返回的實際結果。所有需要做的就是一旦完成就觸發getColdStuff()

我試着在答案的建議,並創建以下:

BehaviorRelay<Observable<StuffFetchResult>> callSequence = BehaviorRelay.create(); 
Observable<StuffFetchResult> valueSequence = Observable.switchOnNextDelayError(callSequence.toSerialized()); 
valueSequence.subscribe(new Subscriber<StuffFetchResult>() { 
    @Override 
    public void onCompleted() {} 

    @Override 
    public void onError(Throwable e) {} 

    @Override 
    public void onNext(StuffFetchResult result) { 
     // UI stuff 
    } 
}); 
callSequence.call(loader.getColdStuff()); 

我可以訂閱valueSequence這裏使用callSequence.call(loader.getColdStuff());,這將在我的訂閱onNext()運行第一個方法和產生的結果,我可以將它用於我的用戶界面。但是,我不知道如何並行運行getHotStuff(),並在返回時對其執行不同的操作。還getHotStuff()返回不同類型的Observable,所以我不能真的使用相同的callSequence

EDIT 2

使用兩個訂戶,我能達到所要求的行爲,我認爲。不確定這是否正確。

loader.getHotStuff() 
    .subscribeOn(Schedulers.io()) 
    .subscribe(new Subscriber<Object>() { 
     @Override 
     public void onCompleted() {} 

     @Override 
     public void onError(Throwable e) {} 

     @Override 
     public void onNext(Object stuffWeDontCareAbout) { 
      callSequence.call(loader.getColdStuff()); 
     } 
    }); 

回答

0

如果我正確理解您的情況,您可能希望類似的東西 -

BehaviorSubject<Observable<T> callSequence = BehaviorSubject.create(); 
Observable<T> valueSequence = Observable.swithOnNextDelayError(callSequence.toSerialized()); 

您的用戶將聽valueSequence,每當你需要「重啓」,你會稱之爲 -

callSequence.onNext(call.cache()); // *call* is Observable<T> 

(我離開.subscribeOn/.observeOn配置給你)

+0

謝謝你的回答。請您詳細說明一下,因爲我不確定如何使用您的建議?另外我注意到,我的方法getHotStuff和getColdStuff返回不同類型的可觀察對象 - 最初getHotStuff將從API中獲取,持久化,然後調用getColdStuff從存儲中檢索。所以大概他們不能都在'callSequence'中?我正在尋找的行爲是:'getCold - > resultsToUI - > getHot - > persist - > getCold - resultsToUI' – vkislicins

+0

對於副作用,如堅持存儲,.doOnNext()是適當的地方。如果第一個數據源爲空,則使用替代數據源.switchIfEmpty(alternate)是典型的解決方案。所以整個鏈會像storage.switchIfEmpty(apiCall.doOnNext(persist()));不同類型的observable幾乎不是問題,您可以使用.map() –

+0

再次感謝。我仍然試圖把它放在一起..並不是我需要使用替代數據源,實際上是相反的 - 我需要使用兩者,首先顯示來自過時來源的數據,然後在有新數據時更新。但我不確定如何在單一流程中將其全部連接起來。我會用最新的代碼更新我的問題,以防你可能有一分鐘時間看看。謝謝 – vkislicins