2016-05-09 83 views
0

我想通過自己觀察到的片流,例如:通過觀察到自身

val source = Observable.from(1 to 10).share 
val boundaries = source.filter(_ % 3 == 0) 
val result = source.tumblingBuffer(boundaries) 

result.subscribe((buf) => println(buf.toString)) 

特輸出控制觀察到的緩衝是:

Buffer() 
Buffer() 
Buffer() 
Buffer() 

source可能是迭代的boundaries線,前它達到result,所以它只創建邊界和產生的緩衝區,但沒有什麼可填寫的。

我對此的方法使用publish/connect

val source2 = Observable.from(1 to 10).publish 
val boundaries2 = source2.filter(_ % 3 == 0) 
val result2 = source2.tumblingBuffer(boundaries2) 

result2.subscribe((buf) => println(buf.toString)) 
source2.connect 

這將產生輸出好嗎:

Buffer(1, 2) 
Buffer(3, 4, 5) 
Buffer(6, 7, 8) 
Buffer(9, 10) 

現在我只需要隱藏來自外部世界connectconnect它的時候result被認購(我是一個類的內部這樣做我不想暴露它)。喜歡的東西:

val source3 = Observable.from(1 to 10).publish 
val boundaries3 = source3.filter(_ % 3 == 0) 
val result3 = source3 
      .tumblingBuffer(boundaries3) 
      .doOnSubscribe(() => source3.connect) 

result3.subscribe((buf) => println(buf.toString)) 

但現在,doOnSubscribe行動得到從來沒有所謂所以公佈source得到從未連接...

有什麼不對?

回答

1

您的publish解決方案符合您的要求。然而,還有一個替代publish運算符,它將lambda作爲參數(see documentation),類型爲Observable[T] => Observable[R]。這個lambda的參數是原始流,您可以安全地訂閱多次。在lambda中,您可以根據自己的喜好轉換原始流;在你的情況下,你過濾流緩衝它在該過濾器上。

Observable.from(1 to 10) 
    .publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0))) 
    .subscribe(buf => println(buf.toString())) 

該運算符的最好的事情是,你不需要調用任何事情之後像connect

+0

是的,這可能是我錯過的,謝謝! –