2017-01-17 97 views
2

嘗試進行升級時遇到問題。將io.projectreactor版本從2.0.x升級到3.0.4 - 使用Spring框架

我目前使用2.0.x版本,特別是 -

reactor.bus 
reactor.rx.Stream 
reactor.rx.Streams 
reactor.core.processor.RingBufferProcessor 
reactor.fn.Consumer 

我使用maven,和我有一個關於 'projectreactor' 單一依賴 -

<groupId>io.projectreactor.spring</groupId> 
<artifactId>reactor-spring-core</artifactId> 

當升級到版本3.0.4.RELEASE,爲了繼續使用我之前使用過的所有東西,我需要明確導入 -

<groupId>io.projectreactor</groupId> 
<artifactId>reactor-bus</artifactId> 

而且

<groupId>io.projectreactor</groupId> 
<artifactId>reactor-stream</artifactId> 

但我仍然失蹤

reactor.core.processor.RingBufferProcessor 
reactor.fn.Consumer 

,我不知道該怎麼做。

回答

0
reactor.rx.Stream -> reactor.core.publisher.Flux 
reactor.rx.Streams -> reactor.core.publisher.Flux 
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor 
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor 
reactor.fn.Consumer -> java.until.function.Consumer (Java 8) 

沒有新的彈簧模塊,因爲彈簧5直接包含這些新類型的反應器支持。

至於reactor-bus: 按照設計,現在所有的流路徑(Flux/Mono鏈)都是鍵入的,所以動態路由並不是我們功能的一部分。還是有一個類型化的方式替代,比如:

ReplayProcessor<MyEvent> rp = ReplayProcessor.create(); 
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev)); 
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev)); 
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev)); 

interest1.subscribe(doSomethingForInterest1); 
interest2.subscribe(doSomethingForInterest2); 
interest1_2.subscribe(doSomethingForInterest1_2); 

rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react 
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react 
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react 
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor 

//shutdown/cleanup/close 
rp.onComplete(); 

我發現這個在github上,這似乎滿足您的需求

1

reactor.fn.Consumer被Java 8 java.util.function.Consumer所取代。

至於RingBufferProcessor你必須選擇一個new processors所有使用環形緩衝區。

Dispatcher s現在Schedulers使用Java的Executor s在引擎蓋下。

+0

什麼abount RingBufferDispatcher?我正在使用EventBus,似乎所有的東西都會很快被棄用。讓我想到這個庫。 –

+0

@AdamBerlin我已經添加了關於調度員的註釋。不確定EventBus,但是核心庫已經放棄了許多自定義實現轉移到標準Java 8類。 –

+0

@AdmBerlin注意到3.x與2.x相比是一個非常大的範式轉換,現在更加關注反應式編程和反應流規範。 –