2016-10-29 59 views
64

我一直在尋找新的RX的Java 2,我不是很確定我理解的backpressure了的想法...可觀察VS可流動rxJava2

我知道,我們有Observable不具有backpressure支持和Flowable有它。

因此,基於例如,可以說我有flowableinterval

 Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Consumer<Long>() { 
       @Override 
       public void accept(Long aLong) throws Exception { 
        // do smth 
       } 
      }); 

這是怎麼回事後,約128價值崩潰,並且那是很明顯的,我比消費項目越來越慢。

但是,我們有相同帶Observable

 Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Consumer<Long>() { 
       @Override 
       public void accept(Long aLong) throws Exception { 
        // do smth 
       } 
      }); 

這不會在所有的崩潰,甚至當我穿上消耗它仍然有效一些延遲。爲了使Flowable工作可以說我把onBackpressureDrop運算符,崩潰消失了,但並不是所有的值都發射。

因此,我目前找不到答案的基本問題是爲什麼我應該關心backpressure當我可以使用普通Observable仍然接收所有值而不管理buffer?或者從另一方面來看,backpressure有利於管理和處理消費?

回答

61

實踐中的背壓清單是有界的緩衝區,Flowable.observeOn有128個元素的緩衝區,它們可以像下游流一樣快地排空。您可以單獨增加緩衝區大小來處理突發源,所有背壓管理實踐仍然適用於1.x. Observable.observeOn有一個無限制的緩衝區,不斷收集元素,你的應用可能會耗盡內存。

您可以使用Observable例如:

  • 處理GUI事件
  • 與短序列(小於1000種元素總)

工作,則可以使用Flowable例如:

  • 冷和非定時來源
  • 發生器狀光源
  • 網絡和數據庫訪問器
+0

由於這已經出現[在另一個問題](http://stackoverflow.com/questions/42525623/rxjava-2-x-should-i-use-flowable-or-single -completable) - 當它們在語義上合適時,可以總是*使用'Maybe','Single'和'Completable'來代替'Flowable',這是否正確? –

8

事實上,您的Flowable在發射128個值後沒有背壓處理而墜毀並不意味着它在128個值後總會崩潰:有時它會在10之後崩潰,有時甚至不會崩潰。我相信這是發生在您嘗試使用Observable的示例時發生的情況 - 碰巧沒有背壓,所以您的代碼正常工作,下次可能不會。 RxJava 2的不同之處在於,Observable已不存在背壓概念,無法處理它。如果您正在設計可能需要明確反壓處理的反應序列,那麼Flowable是您的最佳選擇。

+0

是我觀察到,有時後不太值破門,有時但是,如果例如我只處理沒有'backpressure'的'interval',我會期待一些奇怪的行爲或問題嗎? – user2141889

+0

如果你確定無法在特定的Observable序列中發生背壓問題 - 那麼我推測可以忽略背壓是好的。 – Egor

26

背壓是當你觀察到的(發佈者)正在創造更多的事件比你的用戶可以處理。因此,您可以讓訂閱者丟失事件,或者您可能會遇到大量導致內存不足的事件。 Flowable考慮到背壓。 Observable沒有。而已。

它讓我想起了漏斗,當它有太多的液體溢出。

着巨大的反壓:可流動可不能使這種情況發生幫助

enter image description here

但用流動的,有少得多背壓:

enter image description here

Rxjava2有幾個您可以根據自己的用例使用背壓策略。通過策略,我的意思是Rxjava2提供了一種方法來處理由於溢出(背壓)而無法處理的對象。

here are the strategies. 我不會去通過他們所有,但例如,如果你想不用擔心被溢出,你可以使用這樣的下降戰略項目:

observable.toFlowable(BackpressureStrategy.DROP)

據我所知,隊列中應該有128個項目的限制,之後可能會有溢出(背壓)。即使它不接近這個數字。希望這可以幫助某人。

,如果你需要從128看起來是可以做到的 這樣更改緩衝區大小(但觀看任何內存限制:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower. 
+0

我一直認爲backpressure是一個機制的名稱,它可以讓消費者通知生產商減速... – kboom

+0

可能是這樣。是 – j2emanue

+0

使用Flowable有什麼缺點嗎? –