0
我想通過自己觀察到的片流,例如:通過觀察到自身
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
特輸出控制觀察到的緩衝是:
Buffer()
Buffer()
Buffer()
Buffer()
source
可能是迭代的boundaries
線,前它達到result
,所以它只創建邊界和產生的緩衝區,但沒有什麼可填寫的。
我對此的方法使用publish
/connect
:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
這將產生輸出好嗎:
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
現在我只需要隱藏來自外部世界connect
和connect
它的時候result
被認購(我是一個類的內部這樣做我不想暴露它)。喜歡的東西:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
但現在,doOnSubscribe
行動得到從來沒有所謂所以公佈source
得到從未連接...
有什麼不對?
是的,這可能是我錯過的,謝謝! –