2017-03-05 75 views
0

我實際上使用的是Scala,但這個問題對所有的Rx和流式框架都是通用的。我的用例是我有一個生成的observable(因此冷),我想多個消費者並行使用完全相同的值,但我希望他們有顯着不同的吞吐量。如何播放一個冷觀察:用背壓重播?

我需要的可以通過廣播一個可重放的observable來完成,但是我發現帶有最大緩衝區大小的重播策略是從緩衝區中溢出元素(然後爲最慢的消費者丟失)而不是反壓制片商。如果你把所有廣播的觀察對象視爲熱點,這是有道理的,但在我的情況下,我知道它實際上很冷,並且可能受到背壓。

是否有某種方法可以在任何符合JVM反應流的框架中實現這一點?

非常感謝!

回答

0

RxJava通過publish運算符支持這個操作符,它可以協調來自單個消費者的請求,也就是說,它以固定的速率請求最慢的消費者請求。不幸的是,沒有RxScala 2目前只有RxJava 2支持無流規範,因此,你可能有一點不便的把這個變成斯卡拉:

Flowable.fromPublisher(Flowable.range(1, 1000)) 
.publish(f -> 
    Flowable.mergeArray(
     f.observeOn(Schedulers.computation()).map(v -> v * v), 
     f.observeOn(Schedulers.computation()).map(v -> v * v * v) 
    ) 
) 
.blockingSubscribe(System.out::println); 

另一種方法是使用ConnectableObservable並連接手動一旦所有消費者已經訂閱:

ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000)) 
    .publish(); 

co.observeOn(Schedulers.computation()).map(v -> v * v) 
    .subscribe(System.out::println); 

co.connect(); 
+0

感謝您的意見。在您回答之後,我還注意到akka流通過BroadcastHub提供了類似的功能: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html –