2017-08-07 81 views
0

任何人都可以解釋爲什麼這段代碼永遠不會結束(總是拋出TimeoutException),3.0.7和3.1.0.M3都一樣嗎? 我認爲剩下的4個線程將逐個完成他們的工作。Spring Reactor調度程序鎖

private static class Holder { 
    private final int group, idx; 

    Holder(int group, int idx) { 
     this.group = group; 
     this.idx = idx; 
    } 
} 

private static final Logger log = LoggerFactory.getLogger(RxTest2.class); 

@Test 
public void test1() throws Exception { 
    final Scheduler scheduler = Schedulers.parallel(); 
    final AtomicInteger c = new AtomicInteger(); 

    Flux.range(0, 8).flatMap(gi -> Flux.range(0, 30).map(i -> new Holder(gi, i))) 
     .groupBy(h -> h.group) 

     .parallel() 
     .runOn(scheduler) 

     .flatMap(chunk -> { 

      log.debug("chunk {}", chunk.key()); 

      chunk 
       .parallel() 
       .runOn(scheduler) 
       .flatMap(h -> { 
        log.debug("{} - {}", h.group, h.idx); 
        c.incrementAndGet(); 
        return Mono.just(true); 
       }) 
       .sequential() 
       .blockLast(); 

      return Mono.just(true); 
     }) 
     .sequential() 
     .timeout(Duration.ofSeconds(2)) 
     .doOnTerminate(() -> log.debug("count: {}", c.get())) 
     .blockLast(); 
} 

,如果我把它拆分分離調度(Schedulers.newParallel( 「P1」,8),Schedulers.newParallel( 「P2」)) - 一切正常

是否有計數一些特殊規則調度程序的線程數量?

回答

0

你似乎對Schedulers.parallel()運行多個並行計算,他們完全飽和的調度,所以沒有留下線程處理.flatMap(chunk ->。我們在這裏有線程飢餓的情況。只要刪除內部chunk.parallel,沒有必要這樣做,因爲計算已經平行。

+0

感謝您的回覆,並對我長久的沉默抱歉! 假設我們設置了: ... final Scheduler scheduler = Schedulers.parallel(16); ... .parallel(8) .runOn(調度) ... .parallel(4) .runOn(調度) 情況還是一樣。 我想:首先_parallel_塊8個線程,其他任務可以使用剩餘的8個線程。內部任務很短。工作應該沒問題。 我想明白我爲什麼錯了。什麼可能會阻塞剩餘線程 – user1222796