2016-09-06 121 views

回答

4

您可以使用ConnectedObservable的功能來處理這個要求:

//Replace Observable.range(1,1000) with your Observable implementation 
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share(); 

share方法調用方法publishrefCount

publish將您的「普通」Observable轉換爲ConnectedObservable,當您撥打connect時,它將開始發射物品。因此,您可以在技術上訂閱儘可能多的觀察者,然後致電connect同時開始爲他們全部發布物品。

refCount將您的ConnectedObservable再次轉換回傳統的,但具有新的特點!附加的好處是:這個可觀察的現在是冷的(只有當用戶訂閱時纔開始發射,在內部它調用publish創建的原始ConnectedObservable的connect方法),並跟蹤有多少用戶連接到原始ConnectedObservable。一旦所有訂戶都取消訂閱,它將從源代碼ConnectedObservable中取消綁定,因此邏輯變得更簡單,因爲您只需處理一個訂閱。

沒有爲共享操作這裏的好圖:http://reactivex.io/RxJava/javadoc/rx/Observable.html#share()

或者,如果這是不夠靈活,我想你應該能夠實現這種行爲很容易爲了使用defer創建冷觀察到,以及doOnSubscribedoOnUnsubscribe方法。

例子:

Observable.defer(() -> { 
     final AtomicInteger counter = new AtomicInteger(); 
     return Observable.range(1, 1000) 
       .doOnSubscribe(() -> counter.incrementAndGet()) 
       .doOnUnsubscribe(() -> { 
        if (counter.decrementAndGet() == 0) { 
         freeResources(); 
        } 
       }); 
    }); 

此觀察到將開始發射數字序列(與你觀察的實現來替代這一點)只要第一用戶訂閱,它會增加與每個訂閱一個計數器,並釋放一旦所有訂閱者都取消訂閱,就會使用資源(取代freeResources以滿足您的任何需求)。

相關問題