2017-07-18 108 views
2

我正在嘗試創建一個Flowable,它發出關於背壓的事件以避免內存問題,同時並行運行每個轉換階段以提高效率。我創建了一個簡單的測試程序,用於推理我的程序的不同步驟的行爲,以及何時發生事件而不是等待不同階段。使用observeOn時,爲什麼我的RxJava Flowable不尊重背壓?

我的程序如下:

public static void main(String[] args) throws ExecutionException, InterruptedException { 
    Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList()) 
     .stream().map(i -> { 
     System.out.println("emitting:" + i); 
     return i; 
     }); 

    Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator()); 
    System.out.println(String.format("Buffer size: %d", flowable.bufferSize())); 

    Long count = flowable.onBackpressureBuffer(10) 
     .buffer(10) 
     .flatMap(buf -> { 
     System.out.println("Sleeping 500 for batch"); 
     Thread.sleep(500); 
     System.out.println("Got batch of events"); 
     return Flowable.fromIterable(buf); 
     }, 1) 
     .map(x -> x + 1) 
     .doOnNext(i -> { 
     System.out.println(String.format("Sleeping : %d", i)); 
     Thread.sleep(100); 
     System.out.println(i); 
     }) 
     .count() 
     .blockingGet(); 

    System.out.println("count: " + count); 
} 

當我跑,我得到的輸出如預期那樣尊重背壓,其中一批事件被emmited到大小buffer,然後將它們flatmapped,最後是採取了一些行動,他們正在印刷,其中一個接一個:

Buffer size: 128 
emitting:0 
emitting:1 
emitting:2 
emitting:3 
emitting:4 
emitting:5 
emitting:6 
emitting:7 
emitting:8 
emitting:9 
Sleeping 500 for batch 
Got batch of events 
Sleeping : 1 
1 
Sleeping : 2 
2 
Sleeping : 3 
3 
Sleeping : 4 
4 
Sleeping : 5 
5 
Sleeping : 6 
6 
Sleeping : 7 
7 
Sleeping : 8 
8 
Sleeping : 9 
9 
Sleeping : 10 
10 
emitting:10 
emitting:11 
emitting:12 
emitting:13 
emitting:14 
emitting:15 
emitting:16 
emitting:17 
emitting:18 
emitting:19 
Sleeping 500 for batch 
Got batch of events 
Sleeping : 11 
11 
Sleeping : 12 
12 
Sleeping : 13 

但是,如果我試圖通過增加.observeOn(Schedulers.computation())一些調用此並行運行的不同階段,那麼就好像我的程序不再尊重backpre ssure。我的代碼現在看起來像:

public static void main(String[] args) throws ExecutionException, InterruptedException { 
    Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList()) 
     .stream().map(i -> { 
     System.out.println("emitting:" + i); 
     return i; 
     }); 

    Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator()); 
    System.out.println(String.format("Buffer size: %d", flowable.bufferSize())); 

    Long count = flowable.onBackpressureBuffer(10) 
     .buffer(10) 
     .observeOn(Schedulers.computation()) 
     .flatMap(buf -> { 
     System.out.println("Sleeping 500 for batch"); 
     Thread.sleep(500); 
     System.out.println("Got batch of events"); 
     return Flowable.fromIterable(buf); 
     }, 1) 
     .map(x -> x + 1) 
     .observeOn(Schedulers.computation()) 
     .doOnNext(i -> { 
     System.out.println(String.format("Sleeping : %d", i)); 
     Thread.sleep(100); 
     System.out.println(i); 
     }) 
     .observeOn(Schedulers.computation()) 
     .count() 
     .blockingGet(); 

    System.out.println("count: " + count); 
} 

而且我的輸出是下面的,所有我的事件被髮出,而不是尊重由執行各階段規定的背壓和緩衝區前期:

Buffer size: 128 
emitting:0 
emitting:1 
emitting:2 
emitting:3 
emitting:4 
emitting:5 
emitting:6 
emitting:7 
emitting:8 
emitting:9 
emitting:10 
Sleeping 500 for batch 
emitting:11 
emitting:12 
... everything else is emitted here ... 
emitting:998 
emitting:999 
Got batch of events 
Sleeping 500 for batch 
Sleeping : 1 
1 
Sleeping : 2 
2 
Sleeping : 3 
3 
Sleeping : 4 
4 
Sleeping : 5 
Got batch of events 
Sleeping 500 for batch 
5 
Sleeping : 6 
6 
Sleeping : 7 
7 
Sleeping : 8 
8 
Sleeping : 9 
9 
Sleeping : 10 
Got batch of events 
Sleeping 500 for batch 
10 
Sleeping : 11 
11 
Sleeping : 12 
12 
Sleeping : 13 
13 
Sleeping : 14 
14 
Sleeping : 15 
Got batch of events 
Sleeping 500 for batch 
15 
Sleeping : 16 
16 
Sleeping : 17 
17 
Sleeping : 18 
18 
Sleeping : 19 
19 
Sleeping : 20 
Got batch of events 
Sleeping 500 for batch 
20 
Sleeping : 21 
21 
Sleeping : 22 
22 
Sleeping : 23 
23 
Sleeping : 24 
24 
Sleeping : 25 
Got batch of events 
Sleeping 500 for batch 
25 

假裝我的批處理階段正在呼叫外部服務,但我希望他們能夠平行運行,因爲延遲。我也希望在給定的時間內控制內存中的物品數量,因爲最初發出的物品數量可能變化很大,並且批次運行的階段比最初的事件發射速度慢得多。

如何在Scheduler範圍內對Flowable尊重背壓?爲什麼在我撥打observeOn的電話時,似乎不尊重背壓?

回答

3

怎樣纔可以有我的可流動相的背壓跨越調度

實際上,施加onBackpressureBuffer使得源上方它從由下游應用的任何背壓斷開,因爲它是無界的操作員。你不需要它,因爲Flowable.fromIterable(順便說一句,RxJava有一個range運營商)支持並承諾背壓。

爲什麼當我撒上電話觀察時,只是不尊重背壓?

在第一示例中,存在發生所謂調用堆棧阻斷天然背壓。 RxJava默認是同步的,大多數運算符不會引入異步,就像第一個例子中沒有。

observeOn因此在理論上引入了異步邊界,階段可以相互平行運行。它有一個默認的128個元素預取緩衝區,可以通過它的一個過載來調整。然而,在你的情況下,緩衝區(10)實際上會將預取數量放大到1280,這可能仍然會導致1000個元素長的源一次完全消耗。

+0

爲什麼'buffer(10)'將預取量放大到'bufferSize * flowablePrefetchBufferSize'?確實,如果我改變這個發出10,000個整數,而不是第1280個整數後,我可以看到它由於背壓而停止發射,但是,我應該如何收集批量的東西發送到外部服務而不會爆炸我記憶中的物品? –

+0

由於'buffer'理解當它獲得N個列表的請求時,它必須從其上游請求10 * N個元素來滿足該需求。正如我所提到的,你可以通過參數化'observeOn'來減少請求。 – akarnokd

+0

我明白了,感謝所有的幫助! –

相關問題