2016-11-19 37 views
2

基本上,我們有一個生產者一次產生一個隨機數和幾個睡覺1秒鐘然後打印一個數字的消費者。如何在rxjava中實現平衡扇出?

每個消費者都是獨家的,每個號碼只能有一個接收者。
此行爲與java中的JMS隊列或BlockingQueue類似。

在阿卡流,我能找到

balance[T] – (1 input, N outputs) given an input element emits to one of its output ports. 

但我無法找到任何內置組件rxjava做同樣的工作。
Observable總是向所有觀察者廣播消息,如pub-sub風格。我應該怎麼做,如果我需要queue風格。

我想念什麼?

+0

你的用例是什麼?你想並行處理元素嗎? – akarnokd

+0

是的,我想要並行處理元素。我可以通過多線程和阻塞隊列來做到這一點,但我不知道是否讓這個更優雅。 生產者正在發佈從數據庫中提取的事件,以及一大堆耗時費力處理這些事件的消費者。這些消費者完全一樣。 –

回答

1

我認爲你所擁有的心智模式並不能真正匹配Rx所建立的思維模式 - 思考許多小操作的流,而不是大流量組件之間的消息。

我建議一)加蓋的線程池B)的RX調度周圍,然後C:

databaseSource 
.fetchItems() 
.flatMap(item -> 
    Obsevable.just(item) 
    .observeOn(cappedThreadScheduler) 
    .map(item -> longRunningOperation(item)) 
) 

OTOH,你可以像這樣做太:

databaseSource 
.fetchItems() 
.flatMap(item -> 
    Obsevable.just(item) 
    .observeOn(schedulers.io()) 
    .map(item -> longRunningOperation(item)) 
    , 16 
) 

要最多有16個操作並行運行。

+0

嗨,我更新我的代碼http://stackoverflow.com/questions/40701696/is-it-ok-to-transform-following-code-in-rxjava,你可以幫助在可運行的代碼? –