2017-07-17 65 views
1

我放在一起這個虛擬實例,試圖瞭解backpressure好一點:瞭解流動背壓rxjava2

Flowable.range(1, 100).onBackpressureDrop() 
         .subscribeOn(Schedulers.io()) 
         .observeOn(AndroidSchedulers.mainThread()) 
         .subscribeWith(object : DisposableSubscriber<Int>() { 
         override fun onStart() { 
          request(1) 
         } 

         override fun onComplete() { 
          Log.d([email protected]::class.java.simpleName, "onComplete") 
         } 

         override fun onNext(t: Int?) { 
          Log.d([email protected]::class.java.simpleName, t.toString()) 
          Thread.sleep(1000) 
          request(1) 
         } 

         override fun onError(t: Throwable?) { //handle error} 
         }) 

我有一個非常緩慢的Subscriber從一個非常快的Flowable消耗數據。我正在指示Flowable onBackPressureDrop()。儘管這樣,我的輸出看起來像這樣(從1到100)

07-16 23:07:21.097 22389-22389 D: 1 
07-16 23:07:22.100 22389-22389 D: 2 
07-16 23:07:23.102 22389-22389 D: 3 
07-16 23:07:24.104 22389-22389 D: ... 
07-16 23:07:24.104 22389-22389 D: ... 
07-16 23:07:24.105 22389-22389 D: 99 
07-16 23:07:25.105 22389-22389 D: 100 
07-16 23:07:25.107 22389-22389 D: onComplete 

我期待缺失的元素,因爲用戶是極其緩慢的,但事實並非如此,在所有打印從1到100號到控制檯,每秒一次。

接下來,我試圖一次請求所有值。所以我將request(1)替換爲onStartrequest(Long.MAX_VALUE),並從onNext調用中刪除request(1)。但它仍然打印數字1到100,沒有丟失元素。

所以我想知道如何才能模擬一個用戶緩慢的用戶失蹤事件? 我怎樣才能使反壓異常發生?

感謝

回答

3

observeOn有128默認的內部緩衝區的大小,這就是爲什麼你沒有看到下降的元素,因爲它可以簡單地緩衝了所有你生成的100個元素。您可以通過observeOn(mainThread(), false, 1)將緩衝區大小設置爲1,並且體驗下降。

+0

這樣做!謝謝。我遇到滴,但只有'.onBackPressureDrop()'操作符存在。爲什麼我刪除它時不會得到異常?即使緩衝區已滿,它也會正常打印所有值。 – feresr

+1

因爲'Flowable.range'支持背壓,並且可以與'observeOn'互操作,不管緩衝區有多大,也不會發信號通知MBE。順便說一句,每個運算符在Flowable上都有一個** Backpressure ** javadoc條目來解釋它的行爲。 – akarnokd