2017-02-16 16 views
19

我正在使用RsJS 5(5.0.1)在Angular 2中緩存。它運行良好。HTTP服務的反應緩存

緩存功能的肉:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) 
) 
    .publishReplay(1, this.RECACHE_INTERVAL) 
    .refCount().take(1) 
    .do(() => this.console.log('CACHE HIT', cacheKey)); 

actualFnthis.http.get('/some/resource')

就像我說的,這對我來說是完美的。高速緩存從RECACHE_INTERVAL的持續時間的可觀察值返回。如果在該間隔後發出請求,則將調用actualFn()

我想知道的是當RECACHE_INTERVAL到期並且actualFn()被稱爲—如何返回最後一個值。在RECACHE_INTERVAL到期和重播actualFn()之間有一段時間,即observable沒有返回值。我希望能夠及時擺脫這種差距,並始終返回最後的價值。

我可以使用一個副作用,並在等待HTTP響應返回時存儲最後的良好值調用.next(lastValue),但這看起來很樸素。如果可能,我想使用「RxJS」方式,純功能解決方案—。

回答

2

更新答案:

如果總是希望使用以前的值,而正在取得新的請求,那麼可以把它保留最近的價值鏈中的另一個主題。

然後,您可以重複該值,以便可以判斷它是否來自緩存。然後用戶可以過濾出緩存的值,如果他們不感興趣的話。

// Take values while they pass the predicate, then return one more 
// i.e also return the first value which returned false 
const takeWhileInclusive = predicate => src => 
    src 
    .flatMap(v => Observable.from([v, v])) 
    .takeWhile((v, index) => 
    index % 2 === 0 ? true : predicate(v, index) 
) 
    .filter((v, index) => index % 2 !== 1); 

// Source observable will still push its values into the subject 
// even after the subscriber unsubscribes 
const keepHot = subject => src => 
    Observable.create(subscriber => { 
    src.subscribe(subject); 

    return subject.subscribe(subscriber); 
    }); 

const cachedRequest = request 
    // Subjects below only store the most recent value 
    // so make sure most recent is marked as 'fromCache' 
    .flatMap(v => Observable.from([ 
    {fromCache: false, value: v}, 
    {fromCache: true, value: v} 
    ])) 
    // Never complete subject 
    .concat(Observable.never()) 
    // backup cache while new request is in progress 
    .let(keepHot(new ReplaySubject(1))) 
    // main cache with expiry time 
    .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL))) 
    .publish() 
    .refCount() 
    .let(takeWhileInclusive(v => v.fromCache)); 

    // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL 
    // Subscribers will get the most recent cached value first then an updated value 

https://acutmore.jsbin.com/kekevib/8/edit?js,console

原來的答覆:

在replaySubject設置窗口大小,而不是的 - 你可以更改源觀察到的延遲後重復。

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) 
) 
    .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL)) 
    .publishReplay(1) 
    .refCount() 
    .take(1) 
    .do(() => this.console.log('CACHE HIT', cacheKey)); 

repeatWhen運營商需要RxJs-beta12或更高 https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

+0

感謝您的建議,這是非常好的。有兩個問題:我需要從鏈中刪除'.take(1)'運算符,否則它將不會再發出第二個HTTP請求。第二個問題是我希望在RECACHE_INTERVAL之後調用另一個subsribe()時觸發recache,而不是每個RECACHE_INTERVAL。 – Martin

+0

@Martin我已經更新了我的答案,以便只有在RECACHE_INTERVAL後另一個訂閱者訂閱時纔會發出請求。希望這有助於幫助 – Ashley

2

看到這個演示:https://jsbin.com/todude/10/edit?js,console

注意到,我在1200ms時的情況下是無效的,然後試圖讓緩存結果在1300ms時,前一個請求仍然未決(需要200ms)。兩個結果都應該收到。

這是因爲,當您訂閱和publishReplay()不包含任何有效的數值也不會發出任何東西,也不會立即完成(感謝take(1)),所以它需要訂閱的源這使得HTTP請求(這實際上發生在refCount())。

然後第二個用戶也不會收到任何東西,並將被添加到publishReplay()的觀察員陣列中。它不會再訂閱,因爲它已經訂閱了它的來源(refCount()),並且正在等待迴應。

所以你所描述的情況不應該發生我認爲。最終制作一個演示,演示您的問題。

編輯

發射都失效的項目和新項目

下面的例子顯示出比聯例子有點不同的功能。如果緩存的響應無效,則無論如何都會發射,然後它也會收到新的值。這意味着用戶接收的一個或兩個值:

  • 1值:高速緩存的值
  • 2的值:該失效的緩存值,然後新的那會從現在起被高速緩存新鮮值。

的代碼可能如下所示:

let counter = 1; 
const RECACHE_INTERVAL = 1000; 

function mockDataFetch() { 
    return Observable.of(counter++) 
    .delay(200); 
} 

let source = Observable.defer(() => { 
    const now = (new Date()).getTime(); 

    return mockDataFetch() 
    .map(response => { 
     return { 
     'timestamp': now, 
     'response': response, 
     }; 
    }); 
}); 

let updateRequest = source 
    .publishReplay(1) 
    .refCount() 
    .concatMap(value => { 
    if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { 
     return Observable.from([value.response, null]); 
    } else { 
     return Observable.of(value.response); 
    } 
    }) 
    .takeWhile(value => value); 


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); 

見現場演示:https://jsbin.com/ketemi/2/edit?js,console

這將打印到控制檯輸出如下:

Response 0: 1 
Response 50: 1 
Response 200: 1 
Response 1200: 1 
Response 1300: 1 
Response 1200: 2 
Response 1300: 2 
Response 1500: 2 
Response 3500: 2 
Response 3500: 3 

通知12001300收到先將舊的緩存值1立即a然後用新值2計算另一個值。
另一方面,1500只收到新值,因爲2已被緩存且有效。

最容易混淆的事情可能是爲什麼我使用concatMap().takeWhile()。這是因爲我需要確保新的響應(不是無效的)是發送完整通知之前的最後一個值,並且可能沒有運營商(對於此用例,first()takeWhile()都不適用)。

僅發射當前項目,而不必等待刷新

另一種使用情況可能是,當我們想在不等待來自HTTP請求響應新的發射只緩存值。

let counter = 1; 
const RECACHE_INTERVAL = 1000; 

function mockDataFetch() { 
    return Observable.of(counter++) 
    .delay(200); 
} 

let source = Observable.defer(() => { 
    const now = (new Date()).getTime(); 

    return mockDataFetch() 
    .map(response => { 
     return { 
     'timestamp': now, 
     'response': response, 
     }; 
    }); 
}); 

let updateRequest = source 
    .publishReplay(1) 
    .refCount() 
    .concatMap((value, i) => { 
    if (i === 0) { 
     if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid? 
     return Observable.from([value.response, null]); 
     } else { 
     return Observable.of(value.response); 
     } 
    } 
    return Observable.of(null); 
    }) 
    .takeWhile(value => value); 


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800); 

見現場演示:https://jsbin.com/kebapu/2/edit?js,console

這個例子打印到控制檯:

Response 0: 1 
Response 50: 1 
Response 200: 1 
Response 1200: 1 
Response 1300: 1 
Response 1500: 2 
Response 3500: 2 
Response 3800: 3 

注意兩個12001300獲得價值1,因爲這是緩存值,即使它現在是無效的。第一次調用1200只是產生一個新的HTTP請求,而不等待它的響應並且只發出緩存的值。然後在1500新鮮的價值被緩存,所以它只是重新發布。這同樣適用於35003800

注意,油1200用戶會立即收到通知next但HTTP請求完成後,才complete通知將被髮送。我們需要等待,因爲如果我們在next之後立即發送complete,它會使鏈處置其一次性消息,這也應該取消HTTP請求(這是我們絕對不想做的)。

+0

Martin,謝謝你的回答。我完全瞭解我目前的實施情況。我不清楚的是最後一句 - 「情況」是指你的行爲還是我現有的工作行爲? – Martin

+0

@Martin我不好,我想寫「永遠不會發生」。我不認爲你可以有一個空的窗口,你的用戶不會收到任何東西。 – martin

+0

好的,我明白了 - 我的問題還不夠具體。當新觀察者在RECACHE_INTERVAL之後訂閱時發生'間隙'。我會更新這個問題來澄清。謝謝。 – Martin

2

如果您使用普通的rxjs,幾乎任何複雜的邏輯都會很快失去控制。我寧願從零開始實施定製cache運營商,您可以使用此gist作爲示例。