3

我是ReactiveX庫(我使用它的scala變體,RxScala)的新手。並行執行計算昂貴的映射

我有一個Observable高速發射值。我想將一個函數應用於Observablemap)的所有值。我在map中使用的函數在計算上相當昂貴。

有沒有辦法讓一個線程池並行計算map階段?

+0

顯示一些示例代碼來重現您的問題。 –

回答

7

是的,有一種方法可以做到這一點。

我將緩衝流成塊,並使用Schedulers.computation()在CPU分配負載(基於與大小等於可用的處理器的數目的線程池,其使用一個Executor):

int chunkSize = 1000; 
source 
    .buffer(chunkSize) 
    .flatMap(
    list -> 
     Observable 
     .from(list) 
     .map(expensive) 
     .subscribeOn(Schedulers.computation())) 
... 

如果map操作是足夠昂貴的,你可能就像沒有buffer

source 
    .flatMap(
    x -> 
     Observable 
     .just(x) 
     .map(expensive) 
     .subscribeOn(Schedulers.computation()))