2017-10-04 68 views
0

我有簡單的流量Flux.generate與消費者和並行

Flux<Long> flux = Flux.generate(
      AtomicLong::new, 
      (state, sink) -> { 
       long i = state.getAndIncrement(); 
       sink.next(i); 
       if (i == 3) sink.complete(); 
       return state; 
      }, (state) -> System.out.println("state: " + state)); 

如預期在單個線程其中一期工程:

flux.subscribe(System.out::println); 

輸出是

0 1 2 3 state: 4 

但是,當我切換到並聯:

flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println); 

消費者應該打印狀態:號碼不被調用。我只看到:

0 3 2 1 

它是一個錯誤或預期的功能?

回答

1

我不是一個被動的專家,但在挖掘了源代碼之後,似乎這種行爲是通過設計的;似乎創建一個ParallelFlux具有阻止州消費者呼叫的副作用;如果你想要平行並得到國家消費者援引,你可以使用:

flux.publishOn(Schedulers.elastic()).subscribe(System.out::println); 
+0

明白了。但是,當你創建一個Flux並期望得到一些結果但在代碼中的某個地方使用它會改變這種行爲時,這種奇怪的行爲。 –