2016-11-12 26 views
2

我面對很奇怪的RxJava行爲,我不明白。RxJava調度程序不會改變線程與睡眠

假設我想要並行處理元素。我使用flatMap爲:

public static void log(String msg) { 
    String threadName = Thread.currentThread().getName(); 
    System.out.println(String.format("%s - %s", threadName, msg)); 
} 

public static void sleep(int ms) { 
    try { 
     Thread.sleep(ms); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

public static void main(String[] args) throws InterruptedException { 

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1)); 
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5)); 

    Observable.create(s -> { 
     while (true) { 
      log("start"); 
      s.onNext(Math.random()); 
      sleep(10); 
     } 
    }).subscribeOn(sA) 
      .flatMap(r -> Observable.just(r).subscribeOn(sB)) 
      .doOnNext(r -> log("process")) 
      .subscribe((r) -> log("finish")); 
} 

輸出是相當可預測的:

pool-1-thread-1 - start 
pool-2-thread-1 - process 
pool-2-thread-1 - finish 
pool-1-thread-1 - start 
pool-2-thread-2 - process 
pool-2-thread-2 - finish 
pool-1-thread-1 - start 
pool-2-thread-3 - process 
pool-2-thread-3 - finish 

好吧好吧,但是如果我有n> 10補充睡眠的地圖flatMap並行調度停止變化後的線程。

public static void main(String[] args) throws InterruptedException { 

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1)); 
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5)); 

    Observable.create(s -> { 
     while (true) { 
      log("start"); 
      s.onNext(Math.random()); 
      sleep(10); 
     } 
    }).subscribeOn(sA) 
      .flatMap(r -> Observable.just(r).subscribeOn(sB)) 
      .doOnNext(r -> sleep(15)) 
      .doOnNext(r -> log("process")) 
      .subscribe((r) -> log("finish")); 
} 

是什麼給了以下內容:

pool-1-thread-1 - start 
pool-1-thread-1 - start 
pool-2-thread-1 - process 
pool-2-thread-1 - finish 
pool-1-thread-1 - start 
pool-1-thread-1 - start 
pool-2-thread-1 - process 
pool-2-thread-1 - finish 
pool-1-thread-1 - start 
pool-2-thread-1 - process 

爲什麼???爲什麼所有元素都在flatMap之後在同一個線程(pool-2-thread-1)中進行處理?

回答

2

FlatMap將任何並行任務序列化回單個線程,並且您正在窺視此線程。試試這個代替

public static void main(String[] args) throws InterruptedException { 

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1)); 
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5)); 

Observable.create(s -> { 
    while (!s.isUnsubscribed()) { 
     log("start"); 
     s.onNext(Math.random()); 
     sleep(10); 
    } 
}).subscribeOn(sA) 
     .flatMap(r -> 
      Observable.just(r) 
      .subscribeOn(sB) 
      .doOnNext(r -> sleep(15)) 
      .doOnNext(r -> log("process")) 
     ) 
     .subscribe((r) -> log("finish")); 
} 
+0

謝謝。而且爲什麼我沒有在沒有睡眠的情況下偷看這個線程呢? – corvax

+0

源之間存在非確定性的排放競爭,某些線程也可能發出其他線程的元素。 – akarnokd