0
任何人都可以解釋爲什麼這段代碼永遠不會結束(總是拋出TimeoutException),3.0.7和3.1.0.M3都一樣嗎? 我認爲剩下的4個線程將逐個完成他們的工作。Spring Reactor調度程序鎖
private static class Holder {
private final int group, idx;
Holder(int group, int idx) {
this.group = group;
this.idx = idx;
}
}
private static final Logger log = LoggerFactory.getLogger(RxTest2.class);
@Test
public void test1() throws Exception {
final Scheduler scheduler = Schedulers.parallel();
final AtomicInteger c = new AtomicInteger();
Flux.range(0, 8).flatMap(gi -> Flux.range(0, 30).map(i -> new Holder(gi, i)))
.groupBy(h -> h.group)
.parallel()
.runOn(scheduler)
.flatMap(chunk -> {
log.debug("chunk {}", chunk.key());
chunk
.parallel()
.runOn(scheduler)
.flatMap(h -> {
log.debug("{} - {}", h.group, h.idx);
c.incrementAndGet();
return Mono.just(true);
})
.sequential()
.blockLast();
return Mono.just(true);
})
.sequential()
.timeout(Duration.ofSeconds(2))
.doOnTerminate(() -> log.debug("count: {}", c.get()))
.blockLast();
}
,如果我把它拆分分離調度(Schedulers.newParallel( 「P1」,8),Schedulers.newParallel( 「P2」)) - 一切正常
是否有計數一些特殊規則調度程序的線程數量?
感謝您的回覆,並對我長久的沉默抱歉! 假設我們設置了: ... final Scheduler scheduler = Schedulers.parallel(16); ... .parallel(8) .runOn(調度) ... .parallel(4) .runOn(調度) 情況還是一樣。 我想:首先_parallel_塊8個線程,其他任務可以使用剩餘的8個線程。內部任務很短。工作應該沒問題。 我想明白我爲什麼錯了。什麼可能會阻塞剩餘線程 – user1222796