2016-11-13 85 views
0

我有一個org.reactivestreams.Processor,我希望在RxJava 2.0中使用它。然而,儘管有轉換來將org.reactivestreams.Publisher與RxJava集成,如io.reactivex.Flowable#fromPublisher,但我不清楚如何最好地集成org.reactivestreams.Processor(或org.reactivestreams.Subscriber)。任何人都可以對此發光一些?在RxJava 2.0中使用Reactive-Streams處理器

回答

0

您纏繞Publisher側並保持Subscriber一側是:

Processor proc = ... 

Subscriber sub = proc; 
Flowable flow = Flowable.fromPublisher(proc); 

flow.map(v -> v.toString()).subscribe(System.out::println); 

sub.onNext(1); 
+0

嗯,但後來我可能違反反應流合同,指定了'onNext'不能更經常被稱爲然後通過請求'訂閱#請求(長)'。 –

+0

這取決於您獲得該處理器的位置,還是它協調下游請求與否。 RxJava的處理器不會協調,如果您向他們發送Subscription,他們總是會請求Long.MAX_VALUE。 – akarnokd

+0

處理器確認[spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification),並且已成功與akka流集成。我認爲與RxJava 1.x不同,RxJava 2.0支持背壓... –