2017-09-03 142 views
2

我用下面的任務試驗,以讓我的頭周圍RxJava的簡單的例子:rxjava2 - 在一個線程池執行任務,訂閱在單個線程

  • 給出的URL列表
  • 待辦事項用於在線程池
  • 對於每個結果每個URL一個HTTP請求中插入一些數據到SQLite數據庫(這裏沒有多線程)
  • 塊中的方法,直到它完成

所以,我想它在科特林:

val ex = Executors.newFixedThreadPool(10) 
Observable.fromIterable((1..100).toList()) 
    .observeOn(Schedulers.from(ex)) 
    .map { Thread.currentThread().name } 
    .subscribe { println(it + " " + Thread.currentThread().name } 

我希望它打印

pool-1-thread-1 main 
pool-1-thread-2 main 
pool-1-thread-3 main 
pool-1-thread-4 main 
.... 

但是它打印:

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 

任何人都可以糾正我關於如何工作的誤解?爲什麼它不使用線程池的所有線程?我如何讓我的訂閱者在主線程上運行或阻塞直到完成?

回答

3

Rx並不是指平行執行服務,因此使用Java的流API。 Rx事件是同步的,隨後將流過流。 observeOn在構建流時會請求線程一次,並在該線程上逐個處理排放。

您還希望subscribe在主線程上執行。 observeOn切換線程,並在該線程上發生所有下游事件。如果您想切換到主線程,則必須在subscribe之前插入另一個observeOn

1

使代碼並行的map塊裏面工作,你應該把它換到可觀察到的與自己的調度:

val ex = Executors.newFixedThreadPool(10) 
    val scheduler = Schedulers.from(ex) 
    Observable.fromIterable((1..100).toList()) 
      .flatMap { 
       Observable 
         .fromCallable { Thread.currentThread().name } 
         .subscribeOn(scheduler) 
      } 
      .subscribe { println(it + " " + Thread.currentThread().name) } 

在這種情況下,你會看到的結果是:

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-2 pool-1-thread-1 
pool-1-thread-3 pool-1-thread-1 
pool-1-thread-4 pool-1-thread-1 
... 

你可以檢查文章RxJava - Achieving Parallelization,給出這種行爲的解釋。

另外,RxJava 2.0.5引入ParallelFlowable API

相關問題