0
我想使用RxJava 1.1.5與Spring WebFlux(即反應堆核心3.1.0.M3),但我無法適應Observable
Flux
。適應RxJava 1.1.5反應堆核心3.1.0.M3
我想這將是相對簡單的,但我的適配器不工作:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
我已驗證onNext
和onCompleted
都獲取調用正確的順序,但我Flux
總是空空的。有沒有人看到我在做什麼錯了?
在相關說明中,爲什麼在reactor-addons中沒有用於RxJava 1的適配器?
在akarnokd之前不敢稱它爲「老」:p但是這就是精神:Rx1到發佈者的適配器路徑非常直截了當,提供直接適配器並不值得麻煩,再加上Reactor想要鼓勵使用反應性流 –
無論如何,這沒有什麼竅門。謝謝你,先生。 –