我有用於套接字通信的自定義Rx適配器。 在它之外,我觀察到Flowable帶有消息。 然後我有一些經理處理每個消息,然後進一步發佈它。與多個用戶一次呼叫流的一部分?
fun observeSocket() = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
然後,我有兩個用戶,做observeSocket()。認購()
的問題是,與每一個消息insideMessageHandler.handle(它)被調用兩次。我想找到流的一部分將爲每個用戶共同的方式。不幸的是,在observeSocket()結尾處的.share()運算符不起作用。
我有這樣的事情了兩次:
/onNextInside
Flowable/-onNextOutsideSubscriber1
Flowable\-onNextOutsideSubscriber2
\-onNextInside
而且我希望有這樣的事情:
/-onNextInside
Flowable/-onNextOutsideSubscriber1
\-onNextOutsideSubscriber2
在代碼中,它看起來像
insideManager.observeSocket().subscribe({do something first})
insideManager.observeSocket().subscribe({do something second})
的問題是,在這種情況下,我有onNextInside調用兩次
這是可能的嗎?
就像答案一樣;-) –
太棒了,它的工作原理幾乎完美!改變了運營商的訂單,現在它確切地是我想要的。謝謝! –