3
我是ReactiveX庫(我使用它的scala變體,RxScala)的新手。並行執行計算昂貴的映射
我有一個Observable
高速發射值。我想將一個函數應用於Observable
(map
)的所有值。我在map
中使用的函數在計算上相當昂貴。
有沒有辦法讓一個線程池並行計算map
階段?
我是ReactiveX庫(我使用它的scala變體,RxScala)的新手。並行執行計算昂貴的映射
我有一個Observable
高速發射值。我想將一個函數應用於Observable
(map
)的所有值。我在map
中使用的函數在計算上相當昂貴。
有沒有辦法讓一個線程池並行計算map
階段?
是的,有一種方法可以做到這一點。
我將緩衝流成塊,並使用Schedulers.computation()
在CPU分配負載(基於與大小等於可用的處理器的數目的線程池,其使用一個Executor
):
int chunkSize = 1000;
source
.buffer(chunkSize)
.flatMap(
list ->
Observable
.from(list)
.map(expensive)
.subscribeOn(Schedulers.computation()))
...
如果map
操作是足夠昂貴的,你可能就像沒有buffer
:
source
.flatMap(
x ->
Observable
.just(x)
.map(expensive)
.subscribeOn(Schedulers.computation()))
顯示一些示例代碼來重現您的問題。 –