2016-12-07 121 views
0

我有一個可觀察的創建成本很高,所以我的shared它。然而,在某些情況下,所有訂戶都會取消訂閱,然後立即(或在短暫延遲後)新訂閱者訂閱。rxjs5:延遲取消訂閱共享可觀察者

實際觀察到的太複雜,這裏複製的,但對於參數的緣故:

const heavyObservable = Rx.Observable.create((observer) => { 
    console.log('I am expensive, avoid hitting this code'); 

    return Rx.Observable 
      .interval(500) // these updates are cheap though! 
      .subscribe(observer) 
       .add(() => { 
        console.log('Cache has been destroyed, will have to be rebuild on next call'); 
       }); 
}); 

我不想打了創建這個觀察到昂貴的代碼。我想延遲斷開,直到n ms。有沒有辦法做到這一點?

const sharedObservable = heavyObservable 
    .publish() 
    // ideally I'm looking for a way to get refCount to wait for new 
    // subscribers for n ms before unsubscribing when refcount === 0 
    .refCount(); 

// calling subscribe here invokes heavyObservable which can take a bit of time 
const subscription1 = sharedObservable.subscribe(); 
// log: I am expensive, avoid hitting this code 

// second call is "free" - the underlying observable is reused 
const subscription2 = sharedObservable.subscribe(); 

subscription1.unsubscribe(); 
subscription2.unsubscribe(); 

// calling subscribe again here invokes heavyObservable over again 
const subscription3 = sharedObservable.subscribe(); 
// log: I am expensive, avoid hitting this code 

回答

0

當沒有完成退訂,再沒有新的數據將被髮射(除非是在流,這不是明顯的在你的問題的開始觸發)。 - 您的案例中的subscription1subscription2應該收到相同的值。 如果這是設計,那麼你可以不使用refCount(),但只是發佈,然後做sharedObservable.connect(),在這種情況下,它總是「熱」。 另一種選擇可能是publishReplay(1)而不是publish()

以任何方式,您的情況聽起來有點奇怪,最有可能通過改變數據流的一般體系結構來解決 - 但是,如果不知道真實用例,很難說出哪些rxjs操作這裏是最好的。

+0

有關調查可觀察到的數據,並將其轉換,並保持它是最新的。只要變換後的數據發生變化,它就會發射。這個轉換最初很昂貴,但維護起來很便宜。我可以使用連接,這將保持服務器連接永久打開。如果可能的話,我想斷開連接。大多數情況下,這工作得很好,但我有一些邊界情況下快速退訂和重新結束。我可以在調用者中通過添加一個明確的,長期存在的sub來處理這個問題,但是這是一個需要記住要做的和清理的額外事情。我寧願有可觀察的處理訂閱「平滑」 – studds

+0

您可以發佈該流嗎? - 如果沒有完整的圖片,幾乎不可能提供適當的幫助(當然,你可以改變網址等。) – olsn

+0

我很喜歡,但它太大而無法在這裏複製。顯然,如果我可以讓代碼更便宜運行或避免運行代碼,那最好,但這超出了堆棧溢出問題的範圍,我想。我添加了一個micky-mouse示例來嘗試並更好地說明目標。 – studds

0

試圖解決這個問題。下面的函數包裝提供的ConnectableObservable source並維護用戶的refCount。當第一用戶訂閱時,它呼叫connect(),然後當最後一個用戶退訂delayms後,從source呼叫setTimeoutunsubscribes

理想情況下,我寧願修改現有的refCount observable,但我不明白代碼是誠實的。

不知道這是否涵蓋所有可能的邊緣情況或是否會產生意想不到的副作用。

Plunker:https://jsbin.com/wafahusitu/edit?js,console

function refCountWithUnsubscriptionDelay<T>(source: Rx.ConnectableObservable<T>, delay: number): Rx.Observable<T> { 

    const refCount = 0; 
    const sub; 
    let timeoutRef; 

    return Rx.Observable.create((observer: Rx.Observer<T>) => { 
     refCount++; 
     if (timeoutRef) { 
      clearTimeout(timeoutRef); 
     } 
     console.log('refCount = ' + refCount); 
     if (!sub) { 
      // connect on first call 
      sub = source.connect(); 
     } 

     return source.subscribe(observer) 
       .add(function() { 
        refCount --; 
        if (refCount <= 0) { 
         // trigger delayed unsubscription if there are no listeners 
         timeoutRef = setTimeout(() => { 
          // don't unsubscribe if new listeners have subscribed 
          if (refCount <= 0) { 
           console.log('unsub'); 
           sub.unsubscribe(); 
           sub = undefined; 
           timeoutRef = undefined; 
          } 
         }, delay); 
        } 
       }); 
    }) 
} 
+0

完全誠實:這看起來不像是一種解決方案,應該與其他人共享 - 它可能對你「有點」起作用 - 但它肯定不是rxjs打算工作的方式 - 而且我絕對相信這一點可以通過爲「沉重」的流使用不同的設置來避免 - 但是如果沒有看到那個流,就不可能幫助你 - 爲了「拯救」可能偶然遇到這個問題的其他用戶:請刪除你的答案因爲它對你的情況來說太具體了,並且使用非rxjs實踐。 – olsn

+0

謝謝olsn。您能否更具體地瞭解非rxjs的做法?或者是否有一些引用可以指向我?你似乎強烈地感覺這是一個不好的解決方案。如果你能夠提供更多的指導,爲什麼這麼糟糕,這將有助於我的學習。 – studds

+0

改變訂閱機制就像購買扳手並修改它成爲一把刀 - 如果你不能發佈你的流,也許你可以在幾個步驟中解釋你的流應該做什麼,什麼時候進行訂閱和預期什麼數據在訂閱內(請隨時在gmail的pure.onh上給我留言) – olsn