2017-08-11 16 views
3

我有一個關於RxJava2的問題。我想在固定線程池的不同線程中運行消費者以並行執行List結果。這裏是我的代碼:如何在RxJava2中並行執行消費者?

List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g"); 
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size()); 
    Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() { 
     @Override 
     public void accept(String data) throws Exception { 
      System.out.println(data + " forEach, thread is " + Thread.currentThread().getName()); 
     } 
    }); 

我得到的結果是:

a forEach, thread is pool-1-thread-1 
b forEach, thread is pool-1-thread-1 
c forEach, thread is pool-1-thread-1 
d forEach, thread is pool-1-thread-1 
e forEach, thread is pool-1-thread-1 
f forEach, thread is pool-1-thread-1 
g forEach, thread is pool-1-thread-1 

但實際上我想是這樣的結果,每個並行consumor執行在不同的線程:

a forEach, thread is pool-1-thread-1 
b forEach, thread is pool-1-thread-2 
c forEach, thread is pool-1-thread-3 
d forEach, thread is pool-1-thread-4 
e forEach, thread is pool-1-thread-5 
f forEach, thread is pool-1-thread-6 
g forEach, thread is pool-1-thread-7 

能有人告訴我如何讓它發生?

回答

4

爲了讀取並行線程中的項目,使用Flowable <>而不是Observable,因爲它提供了並行運算符。 例如:

Flowable.fromIterable(letters) 
     .parallel(letters.size()) 
     .runOn(Schedulers.from(fixedThreadPool)) 
     .sequential() 
     .forEach(data -> System.out.println(data + " forEach, thread is " + 
          Thread.currentThread().getName())); 

正如你不能預測該線程中的一個將被用於每次調用時,輸出可以變化。在我的測試用例我

c forEach, thread is pool-1-thread-3 
g forEach, thread is pool-1-thread-7 
a forEach, thread is pool-1-thread-1 
e forEach, thread is pool-1-thread-5 
d forEach, thread is pool-1-thread-4 
b forEach, thread is pool-1-thread-2 
f forEach, thread is pool-1-thread-6 

欲瞭解更多信息,請諮詢the parallel-flows section of the RxJava Wiki

+2

不要做'.toFlowable(BackpressureStrategy.MISSING)'和'平行()'!另外如果你有一個迭代器,爲什麼不使用'Flowable.fromIterable'並且你可以免費得到背壓? – akarnokd

+0

採取@akarnokd點。將改變代碼片段 – Zapodot

相關問題