2015-10-21 42 views
4

我是RxJava的新手,在(我猜)簡單問題上掙扎。我想在3個線程中模擬地處理訂閱部分。這就是爲什麼我使用FixedThreadPool。示例代碼:如何在Android上使用RxJava在多個線程上運行子系列

Observer.just("one", "two", "three", "four") 
.observeOn(Schedulers.io()) 
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)) 
.subscribe(new Observer<String>() { 

    public void onNext(String string) { 
     Log.d(TAG, "Started: " + string); 
     SystemClock.sleep(1000); 
     Log.d(TAG, "Ended: " + string); 
    } 

    (...) 

} 

預期結果:

Started: one 
Started: two 
Started: three 
Ended: one 
Started: four 
Ended: two 
Ended: three 
Ended: four 

實際結果:

Started: one 
Ended: one 
Started: two 
Ended: two 
Started: three 
Ended: three 
Started: four 
Ended: four 

我在做什麼錯?

回答

5

RxJava Observables是連續的,subscribeOnobserveOn運算符不會並行運行值。

可以達到最接近的事是通過模鍵分組值,通過observeOn運行它們,然後合併結果:

AtomicInteger count = new AtomicInteger(); 

Observable.range(1, 100) 
.groupBy(v -> count.getAndIncrement() % 3) 
.flatMap(g -> g 
    .observeOn(Schedulers.computation()) 
    .map(v -> Thread.currentThread() + ": " + v)) 
.toBlocking() 
.forEach(System.out::println); 
+0

感謝您的回答。但是,我是否正確理解該解決方案「爲每個線程設置單獨的隊列」,因此如果任務不需要花費相同的時間量,那麼最終某些線程可能會提前完成,而一個線程仍有多個任務需要運行。我的問題是RxJava支持在多個線程之間使用共享隊列嗎? – reinra

+0

這個設置和任何共享隊列都沒有偷工作。 – akarnokd

相關問題