我正在嘗試創建一個Flowable
,它發出關於背壓的事件以避免內存問題,同時並行運行每個轉換階段以提高效率。我創建了一個簡單的測試程序,用於推理我的程序的不同步驟的行爲,以及何時發生事件而不是等待不同階段。使用observeOn時,爲什麼我的RxJava Flowable不尊重背壓?
我的程序如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.count()
.blockingGet();
System.out.println("count: " + count);
}
當我跑,我得到的輸出如預期那樣尊重背壓,其中一批事件被emmited到大小buffer
,然後將它們flatmapped,最後是採取了一些行動,他們正在印刷,其中一個接一個:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
但是,如果我試圖通過增加.observeOn(Schedulers.computation())
一些調用此並行運行的不同階段,那麼就好像我的程序不再尊重backpre ssure。我的代碼現在看起來像:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.observeOn(Schedulers.computation())
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.observeOn(Schedulers.computation())
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.observeOn(Schedulers.computation())
.count()
.blockingGet();
System.out.println("count: " + count);
}
而且我的輸出是下面的,所有我的事件被髮出,而不是尊重由執行各階段規定的背壓和緩衝區前期:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25
假裝我的批處理階段正在呼叫外部服務,但我希望他們能夠平行運行,因爲延遲。我也希望在給定的時間內控制內存中的物品數量,因爲最初發出的物品數量可能變化很大,並且批次運行的階段比最初的事件發射速度慢得多。
如何在Scheduler
範圍內對Flowable
尊重背壓?爲什麼在我撥打observeOn
的電話時,似乎不尊重背壓?
爲什麼'buffer(10)'將預取量放大到'bufferSize * flowablePrefetchBufferSize'?確實,如果我改變這個發出10,000個整數,而不是第1280個整數後,我可以看到它由於背壓而停止發射,但是,我應該如何收集批量的東西發送到外部服務而不會爆炸我記憶中的物品? –
由於'buffer'理解當它獲得N個列表的請求時,它必須從其上游請求10 * N個元素來滿足該需求。正如我所提到的,你可以通過參數化'observeOn'來減少請求。 – akarnokd
我明白了,感謝所有的幫助! –