2015-06-15 57 views
1

我有一段代碼,他的工作是更新本地緩存。有兩個觸發該緩存更新:如何「強制」某種背壓以避免rxjava中的多次執行?

  1. 在固定的時間間隔
  2. 當請求

因此,這裏是我如何做這一個基本的例子。

forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create()); 
dataUpdates = Observable 
    .merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS)) 
    .flatMap(new Func1<Long, Observable<Boolean>>() { 
     @Override 
     public Observable<Boolean> call(Long t) { 
      return reloadData(); // operation that may take long 
     } 
    }) 
    .publish(); 

dataUpdates.subscribe(); 
dataUpdates.connect(); 

再後來我有

public void forceReload() { 
    final CountDownLatch cdl = new CountDownLatch(1); 

    dataUpdates 
     .take(1) 
     .subscribe(
      new Action1<Boolean>() { 
       @Override 
       public void call(Boolean b) { 
        cdl.countDown(); 
       } 
      } 
     ); 

    forceReloadEvents.onNext(-1L); 

    try { 
     cdl.await(); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } 
} 

這工作,但問題是,當我開始有forceReload()多個併發呼叫:會有的reloadData()沒有併發執行,但內容將排隊和該過程將循環重新加載數據,直到發送到forceReloadEvents的所有事件都被消耗盡管forceReload()已經完成,因爲先前的事件釋放了CountDownLatch

我想使用onBackPressureDrop但似乎沒有誘發背壓,沒有東西被丟棄。我想要的是強制背壓的一種方式,以便合併理解一次只能處理一個元素,並且必須刪除任何後續事件,直到當前執行完成。

我還考慮過使用bufferthrottleFirst,但我不想強制每個事件之間的特定時間,我寧願有這種自動縮放,取決於重新加載緩存所需的時間。您可以將其視爲throttleFirst,直到reloadData已完成。

回答

2

編輯:根據的評論,你可以有一個的AtomicBoolean作爲flatMap門無法啓動重載,直到門再次打開:

public class AvoidReloadStorm { 
    static Observable<Boolean> reload() { 
     return Observable.just(true) 
     .doOnNext(v -> System.out.println("Reload started...")) 
     .delay(10, TimeUnit.SECONDS) 
     .doOnNext(v -> System.out.println("Reloaded")); 
    } 
    public static void main(String[] args) throws Exception { 
     Subject<Long, Long> manual = PublishSubject.<Long>create().toSerialized(); 
     Observable<Long> timer = Observable.timer(0, 5, TimeUnit.SECONDS) 
       .doOnNext(v -> System.out.println("Timer reload")); 

     AtomicBoolean running = new AtomicBoolean(); 

     ConnectableObservable<Boolean> src = Observable 
     .merge(manual.onBackpressureDrop(), timer.onBackpressureDrop()) 
     .observeOn(Schedulers.io()) 
     .flatMap(v -> { 
      if (running.compareAndSet(false, true)) { 
       return reload().doOnCompleted(() -> { 
        running.set(false); 
       }); 
      } 
      System.out.println("Reload rejected"); 
      return Observable.empty(); 
     }).publish(); 

     src.subscribe(System.out::println); 

     src.connect(); 

     Thread.sleep(100000); 
    } 
} 
+0

我不一定要匹配'onNext'和成功重新加載的發射。我只是想確保我在*之後調用onNext即可獲得成功的重載*,即使它是當我在onNext上調用時正在運行的那個。 'throttleFirst'讓我困擾,好像重新加載只需要500毫秒,然後所有在500和1000毫秒之間的onNext調用都不起作用,'forceReload'將在CDL釋放之前等待下一個事件。 – Crystark

+0

我放入油門首先要避免重新加載風暴,但是你不必使用它。 – akarnokd

+0

那麼,這正是我想要避免的:重新加載風暴。但是沒有定義一個特定的定時器來避免'throttleFirst'的重載。我希望''flatMap'在執行'reloadData'時獲得一個事件的時候產生「backpressure」。這可以解釋爲「節流,直到當前的'reloadData'結束」。 – Crystark

2

我做了這個工作,感謝akarnokd !

這裏是我創建基於他的答案的解決方案:

Observable<Long> forceReloadEvents = this.forceReloadEvents 
    .asObservable() 
    .onBackpressureDrop(); 

Observable<Long> periodicReload = Observable 
    .timer(0, pullInterval, TimeUnit.SECONDS) 
    .onBackpressureDrop(); 

final AtomicBoolean running = new AtomicBoolean(); 

dataUpdates = Observable 
    .merge(forceReloadEvents, periodicReload) 
    .filter(new Func1<Long, Boolean>() { 
     @Override 
     public Boolean call(Long t) { 
      return running.compareAndSet(false, true); 
     } 
    }) 
    .observeOn(Schedulers.io()) 
    .flatMap(new Func1<Long, Observable<Boolean>>() { 
     @Override 
     public Observable<Boolean> call(Long t) { 
      return reloadData(); 
     } 
    }) 
    .doOnNext(new Action1<Boolean>() { 
     @Override 
     public void call(Boolean t) { 
      running.set(false); 
     } 
    }) 
    .publish(); 

dataUpdates.subscribe(); 
dataUpdates.connect(); 

我不知道onBackpressureDrop是有用的,但在這裏我將它作爲一項預防措施。

forceReload代碼不會更改。