2017-08-15 79 views
0

我試圖去與RxJava錯綜複雜的抓地力,但打了一個新手的問​​題:RxJava:熱觀測量的消費者

我試圖從一冷一營造熱銷觀察到的,並訂閱兩位消費者哪些進程以不同的速度推送事件。這裏是一個代碼片段:

ConnectableObservable<Long> ob = Observable.interval(200, TimeUnit.MILLISECONDS) 
    .publish(); 
ob.connect();  

Consumer<Long> withSleep = (Long t) -> { 
    System.out.println("Second : " + t); 
    sleep(1); 
}; 

Consumer<Long> noSleep = (Long t) -> { 
    System.out.println("First : " + t); 
}; 

sleep(2); 

ob.observeOn(Schedulers.newThread()).subscribe(noSleep); 

ob.observeOn(Schedulers.newThread()).subscribe(withSleep); 

sleep(5); 

sleep(2)只是爲了看看observable是否已經開始發射。事實上,這個打印最初如預期的那樣。

  1. 第一:10
  2. 第二:10
  3. 第一:11
  4. 第一:12
  5. 第一:13
  6. 第一:14
  7. 第二:11
  8. 第一個:15
  9. 第一:16
  10. 第一:17

但隨後的第二消費者(具有較長的處理時間,由1秒睡眠模擬的),拾起在序列的情況下(輸出線7),而不是目前的事件(沒有。 14),正如我期望從一個熱門的可觀察者那樣。不管是熱門的觀察者,不管用戶,還是訂閱者在選擇什麼時候推送什麼(假設沒有特定的,明確的背壓策略),都只是繼續開火?

我需要改變什麼才能讓第二個消費者簡單地拿起當前生成的任何東西(即在上例中顯示14而不是11)?

任何幫助將不勝感激。

回答

2

熱或冷,運營商如發佈和觀察在訂閱後保持連續,因此即使處理時間超過排放速率,您也將獲得所有事件。

爲了避免在第二種情況下處理舊的條目,你必須連鎖經營是降事件,並且不緩存太多之一:

ob.toFlowable(BackpressureStrategy.DROP) 
    .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread()) 
    .rebatchRequests(1) 
    .subscribe(withSleep); 

ob.toFlowable(BackpressureStrategy.LATEST) 
    .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread()) 
    .rebatchRequests(1) 
    .subscribe(withSleep); 
+0

謝謝,這樣做的招。但我不明白爲什麼需要延遲操作符(沒有它,當然不起作用)。我認爲這是爲了一些需要的副作用(緩衝區,線程?),但它看起來有點做作,而不是我期望從這個操作符。另外,似乎刪除延遲運算符中的調度程序會產生相同的結果,或者我錯過了什麼? –

+0

延遲不會緩衝或干擾背壓請求,因此不會像'observeOn'那樣預取數據,讓流實際上跳過處理項目時發出的數據。 – akarnokd