我有一個Observable
,我想要冷,也就是說,它應該只在第一個觀察者訂閱它時纔開始發射物品。所有觀察員都退訂Observable是否有事件?
然後,我想確保在所有觀察者都取消訂閱同一個observable時從源中釋放所有資源。那可能嗎?
我有一個Observable
,我想要冷,也就是說,它應該只在第一個觀察者訂閱它時纔開始發射物品。所有觀察員都退訂Observable是否有事件?
然後,我想確保在所有觀察者都取消訂閱同一個observable時從源中釋放所有資源。那可能嗎?
您可以使用ConnectedObservable的功能來處理這個要求:
//Replace Observable.range(1,1000) with your Observable implementation
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share();
share
方法調用方法publish
和refCount
。
publish
將您的「普通」Observable轉換爲ConnectedObservable,當您撥打connect
時,它將開始發射物品。因此,您可以在技術上訂閱儘可能多的觀察者,然後致電connect
同時開始爲他們全部發布物品。
refCount
將您的ConnectedObservable再次轉換回傳統的,但具有新的特點!附加的好處是:這個可觀察的現在是冷的(只有當用戶訂閱時纔開始發射,在內部它調用publish
創建的原始ConnectedObservable的connect
方法),並跟蹤有多少用戶連接到原始ConnectedObservable。一旦所有訂戶都取消訂閱,它將從源代碼ConnectedObservable中取消綁定,因此邏輯變得更簡單,因爲您只需處理一個訂閱。
沒有爲共享操作這裏的好圖:http://reactivex.io/RxJava/javadoc/rx/Observable.html#share()
或者,如果這是不夠靈活,我想你應該能夠實現這種行爲很容易爲了使用defer
創建冷觀察到,以及doOnSubscribe
和doOnUnsubscribe
方法。
例子:
Observable.defer(() -> {
final AtomicInteger counter = new AtomicInteger();
return Observable.range(1, 1000)
.doOnSubscribe(() -> counter.incrementAndGet())
.doOnUnsubscribe(() -> {
if (counter.decrementAndGet() == 0) {
freeResources();
}
});
});
此觀察到將開始發射數字序列(與你觀察的實現來替代這一點)只要第一用戶訂閱,它會增加與每個訂閱一個計數器,並釋放一旦所有訂閱者都取消訂閱,就會使用資源(取代freeResources以滿足您的任何需求)。