2017-06-16 44 views
0

我用sqlbrite來監聽表格a和b的變化。並使用combineLatest操作符來組合由sqlbrite生成的觀察值。在BiFunction過程中,observableA和observableB的排放項目。Rxjava2類似於BiFunction背壓的方法

private CompositeDisposable mSubscriptions = new CompositeDisposable(); 
private void initialize(){ 
    QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null); 
    QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null); 
    ResourceSubscriber subscriber = Flowable.combineLatest(
      RxJavaInterop.toV2Observable(observableA 
        .mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST) 
      , 
      RxJavaInterop.toV2Observable(observableB 
        .mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST) 
      , new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() { 
       @Override 
       public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception { 
        List<ResultItem> resultItems = convertToResultItems(aItems, bItems); // long process here, convert aItems and bItems to resultItems 
        return resultItems; 
       } 
      } 
    ) 
      .onBackpressureLatest() 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeWith(new ResourceSubscriber<List<ResultItem>>() { 
       @Override 
       public void onNext(List<ResultItem> resultItems) { 
        adapter.addData(resultItems); 
       } 

       @Override 
       public void onError(Throwable t) { 
       } 

       @Override 
       public void onComplete() { 
       } 
      }); 
    mSubscriptions.add(subscriber); 
} 

問題:如果雙功能運行時間過長(如10秒),即比觀測再觸發間隔(例如觀測觸發每隔1秒),這將導致該雙功能這樣做,因爲我不需要的作品只需要最新的發射物品,但BiFunction是逐個處理髮射的物品,所以BiFunction將處理舊的發射物品,我不需要處理它。 我希望BiFunction跳過舊的發射物品並處理BiFunction中每完成一次應用()的最新發射物品,以減少資源浪費並節省時間。 rxjava的方法類似於BiFunction的背壓方法還是其他解決此問題的方法?

該圖顯示了當前和預期的BiFunction時間線。 figure link

我發現了兩種方法來解決這個問題,但也有缺陷。

方法1:將Pair的「aItems」和「bItems」結合起來,然後將引用傳遞給switchMap並處理作業。

缺陷:switchMap僅向訂閱者發佈最新的項目,但仍然做不必要的工作。

方法2:也結合了「aItems」和「bItems」,然後將引用傳遞給onNext並處理作業。

缺陷:阻塞了UI線程。

+0

首先,你需要確定一個定義垃圾工作準則。 –

+0

如果BiFunction收到最新的項目和過程,這意味着浪費工作 – KpSt

回答

0

你可以只通過對值沿combineLatest的組合功能,並使用observeOn放置計算過的原始來源線程:

.combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b)) 
.onBackpressureLatest() 
.observeOn(Schedulers.computation(), false, 1) 
.map(pair -> compute(pair)) 
.observeOn(AndroidSchedulers.mainThread()) 
... 
+0

我嘗試此方法將計算過程放置到計算線程,但地圖功能仍然處理從可觀察的發射項目一個接一個,地圖功能havn't放棄舊的散發物品。 – KpSt

+0

通過向'observeOn'添加緩衝級別1更新了示例。 – akarnokd

+0

非常感謝。這行得通。 – KpSt

相關問題