2017-07-30 54 views
0

我想使用RxJava 1.1.5與Spring WebFlux(即反應堆核心3.1.0.M3),但我無法適應ObservableFlux適應RxJava 1.1.5反應堆核心3.1.0.M3

我想這將是相對簡單的,但我的適配器不工作:

import reactor.core.publisher.Flux; 
import rx.Observable; 
import rx.Subscriber; 
import rx.Subscription; 

public static <T> Flux<T> toFlux(Observable<T> observable) { 
    return Flux.create(emitter -> { 
     final Subscription subscription = observable.subscribe(new Subscriber<T>() { 
      @Override 
      public void onNext(T value) { 
       emitter.next(value); 
      } 
      @Override 
      public void onCompleted() { 
       emitter.complete(); 
      } 
      @Override 
      public void onError(Throwable throwable) { 
       emitter.error(throwable); 
      } 
     }); 
     emitter.onDispose(subscription::unsubscribe); 
    }); 
} 

我已驗證onNextonCompleted都獲取調用正確的順序,但我Flux總是空空的。有沒有人看到我在做什麼錯了?

在相關說明中,爲什麼在reactor-addons中沒有用於RxJava 1的適配器?

回答

3

使用RxJavaReactiveStreams適配器將您的Observable變成Publisher,然後Flux.fromPublisher()消耗它。

compile 'io.reactivex:rxjava-reactive-streams:1.2.1' 

Observable<T> o = ... 

Flux.from(RxReactiveStreams.toPublisher(o)); 

在一個相關的說明,爲什麼沒有在反應堆插件RxJava 1適配器?

他們不想支持或鼓勵使用這項舊技術,我完全同意。

+0

在akarnokd之前不敢稱它爲「老」:p但是這就是精神:Rx1到發佈者的適配器路徑非常直截了當,提供直接適配器並不值得麻煩,再加上Reactor想要鼓勵使用反應性流 –

+0

無論如何,這沒有什麼竅門。謝謝你,先生。 –