2017-10-12 41 views
0

我有用於套接字通信的自定義Rx適配器。 在它之外,我觀察到Flowable帶有消息。 然後我有一些經理處理每個消息,然後進一步發佈它。與多個用戶一次呼叫流的一部分?

 fun observeSocket() = socketManager 
       .observe() 
       .doOnNext{ 
        insideMessageHandler.handle(it) 
       } 

然後,我有兩個用戶,做observeSocket()。認購()

的問題是,與每一個消息insideMessageHandler.handle(它)被調用兩次。我想找到流的一部分將爲每個用戶共同的方式。不幸的是,在observeSocket()結尾處的.share()運算符不起作用。

我有這樣的事情了兩次:

  /onNextInside 
Flowable/-onNextOutsideSubscriber1 
Flowable\-onNextOutsideSubscriber2 
     \-onNextInside 

而且我希望有這樣的事情:

  /-onNextInside 
Flowable/-onNextOutsideSubscriber1 
     \-onNextOutsideSubscriber2 

在代碼中,它看起來像

insideManager.observeSocket().subscribe({do something first}) 
insideManager.observeSocket().subscribe({do something second}) 

的問題是,在這種情況下,我有onNextInside調用兩次

這是可能的嗎?

回答

2

的問題是重新創建觀察到的:

fun observeSocket() = socketManager 
     .observe() 
     .doOnNext{ 
      insideMessageHandler.handle(it) 
     } 

隨着每次調用observeSocket()您創建一個新的鏈條,所以把share()就不會有所作爲。

而是定義這個鏈條爲共享單:

private val _observeSocket = socketManager 
     .observe() 
     .doOnNext{ 
      insideMessageHandler.handle(it) 
     } 
     .share() 


fun observeSocket() = _observeSocket 
+1

就像答案一樣;-) –

+0

太棒了,它的工作原理幾乎完美!改變了運營商的訂單,現在它確切地是我想要的。謝謝! –

0

對於這種情況你有主題。

var yourSubject = PublishSubject<Type>.create() 
fun observeSocket() = socketManager.doOnNext(yourSubject::onNext) 

然後,您可以觀察您的observeSocket()和另一個/多個訂閱者的主題。

如果你想分享一個觀測值,你可以使用共享()運算符,它可以確保即使多個用戶訂閱只用創建一個實例,並共用發射數據的發射

fun observeSocket() = socketManager 
      .doOnNext(insideMessageHandler::handle) 
      .share() 

無論如何,這不是一個很好的用於反應的東西,爲了這個目的,平坦/發射到doOnNext中的另一個觀察者。

更好地共享一個流動的,並且訂閱多次,flatmap值你的需要而不是把一切都在一個流動的,因爲如果整個流可能不會發出任何更多的數據流中的一個錯誤。

+0

doOnNext不排放任何 它只是在做與對象的東西,例如登錄它 我會改正我的畫:) –

+0

好吧,持有上。我在幾分鐘內檢查。你能否提供樣本數據,因爲它不是很清楚你要求什麼 –

相關問題