0
我有簡單的流量Flux.generate與消費者和並行
Flux<Long> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next(i);
if (i == 3) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));
如預期在單個線程其中一期工程:
flux.subscribe(System.out::println);
輸出是
0 1 2 3 state: 4
但是,當我切換到並聯:
flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println);
消費者應該打印狀態:號碼不被調用。我只看到:
0 3 2 1
它是一個錯誤或預期的功能?
明白了。但是,當你創建一個Flux並期望得到一些結果但在代碼中的某個地方使用它會改變這種行爲時,這種奇怪的行爲。 –