我有一段代碼,他的工作是更新本地緩存。有兩個觸發該緩存更新:如何「強制」某種背壓以避免rxjava中的多次執行?
- 在固定的時間間隔
- 當請求
因此,這裏是我如何做這一個基本的例子。
forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
.merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData(); // operation that may take long
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
再後來我有
public void forceReload() {
final CountDownLatch cdl = new CountDownLatch(1);
dataUpdates
.take(1)
.subscribe(
new Action1<Boolean>() {
@Override
public void call(Boolean b) {
cdl.countDown();
}
}
);
forceReloadEvents.onNext(-1L);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
這工作,但問題是,當我開始有forceReload()
多個併發呼叫:會有的reloadData()
沒有併發執行,但內容將排隊和該過程將循環重新加載數據,直到發送到forceReloadEvents
的所有事件都被消耗盡管forceReload()
已經完成,因爲先前的事件釋放了CountDownLatch
。
我想使用onBackPressureDrop
但似乎沒有誘發背壓,沒有東西被丟棄。我想要的是強制背壓的一種方式,以便合併理解一次只能處理一個元素,並且必須刪除任何後續事件,直到當前執行完成。
我還考慮過使用buffer
或throttleFirst
,但我不想強制每個事件之間的特定時間,我寧願有這種自動縮放,取決於重新加載緩存所需的時間。您可以將其視爲throttleFirst
,直到reloadData
已完成。
我不一定要匹配'onNext'和成功重新加載的發射。我只是想確保我在*之後調用onNext即可獲得成功的重載*,即使它是當我在onNext上調用時正在運行的那個。 'throttleFirst'讓我困擾,好像重新加載只需要500毫秒,然後所有在500和1000毫秒之間的onNext調用都不起作用,'forceReload'將在CDL釋放之前等待下一個事件。 – Crystark
我放入油門首先要避免重新加載風暴,但是你不必使用它。 – akarnokd
那麼,這正是我想要避免的:重新加載風暴。但是沒有定義一個特定的定時器來避免'throttleFirst'的重載。我希望''flatMap'在執行'reloadData'時獲得一個事件的時候產生「backpressure」。這可以解釋爲「節流,直到當前的'reloadData'結束」。 – Crystark