我有Observable調用消息,其中包含幾條消息。我想同時處理這些消息。我可以如何使用rxJava?如何在並行線程中執行observable
消息。(代碼以執行觀測的物品平行).subscribe(MSG->處理(MSG))
(如果可觀察到的包含五個不同的消息然後我需要在五個單獨的線程來處理這五個消息)
我有Observable調用消息,其中包含幾條消息。我想同時處理這些消息。我可以如何使用rxJava?如何在並行線程中執行observable
消息。(代碼以執行觀測的物品平行).subscribe(MSG->處理(MSG))
(如果可觀察到的包含五個不同的消息然後我需要在五個單獨的線程來處理這五個消息)
如果你想留在Observable
世界裏,你可以flatMap
與subscribeOn
和計算每個元素要在並行:
Observable.range(1, 10)
.flatMap(v ->
Observable.fromCallable(() -> compute(v))
.subscribeOn(Schedulers.computation)
)
.subscribe(e -> { }, Throwable::printStackTrace);
,如果你想讓結果保持給定的順序,那麼你可以使用'concatMapEager'而不是'flatMap'並保持併發性。 – Andy
運行單個線程「觀察」您的消息,並將其中包含的每條消息分配給提交給某種workers thread pool的新消息處理任務(例如,簡單的Runnable
)。
在這裏,你會發現簡單的操作方法:https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html
這聽起來像多線程和NIO反應模型的混合。你確定這將是一件好事嗎? – duffymo
訂閱5 Observable的觀察者? – gaston