2016-11-08 95 views
2

我需要多次查詢設備。每個查詢都需要是異步的,並且設備一次不支持同時查詢。 此外,一旦查詢,不能立即再次查詢。它需要至少1秒的暫停才能正常工作。爲什麼在完成連接之後不會調用完成?

我的兩個查詢,由saveClock()saveConfig()執行,返回一個Promise,並通過返回undefined來解決,如預期的那樣。

在下面的代碼中爲什麼要刪除take()阻止toArray()被調用?
這裏發生了什麼,是否有更好的方法來實現相同的行爲?

export const saveEpic = (action$, store) => 
    action$.ofType(SAVE) 
    .map(action => { 
     // access store and create object data 
     // ... 
     return data; 
    }) 
    .mergeMap(data => 
     Rx.Observable.from([ 
     Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)), 
     Rx.Observable.timer(1000), 
     Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)), 
     Rx.Observable.of(data.id) 
    ]) 
    ) 
    .concatAll() 
    .take(4) 
    .toArray() 
    // [undefined, 0, undefined, "id"] 
    .map(x => { type: COMPLETED, id: x[3] }); 
+1

爲什麼要移除'take()'防止'toArray()'被調用?那麼這個可觀察的事物何時完成? 'toArray()'只適用於已完成的觀測值,不能返回具有不完整流的值。因此,如果流沒有固有的結束(即,如果它被綁定到一個事件監聽器等等),那麼'toArray'永遠不會被調用。但是,當4個項目被髮射時,'take()'使流「完成」。 – aaronofleonard

回答

2

有一對夫婦的事情,我看到:

你最終.map()缺少括號,這在目前的形式是一個語法錯誤,但一個微妙的變化可以使它的偶然一個labeled statement而不是返回一個對象。因爲在目前的形式下它是一個語法錯誤,我想這只是這篇文章中的一個錯誤,而不是你的代碼(它甚至不會運行),但仔細檢查!

// before 
.map(x => { type: COMPLETED, id: x[3] }); 

// after 
.map(x => ({ type: COMPLETED, id: x[3] })); 

隨着該固定的例子並用一個簡單的終極版,可觀察運行測試用例:http://jsbin.com/hunale/edit?js,output因此,如果沒有什麼顯着的我做了不同於你,問題似乎是在代碼中沒有提供。隨意添加更多的洞察力,甚至更好,在我們的JSBin/git回購中重現它。你沒有提到,但是是非常非常值得一提的


一件事是,在終極版,可觀察到的,你的史詩通常是長期存在的「過程管理」。這個史詩將實際上只處理其中一個保存,然後完成(),這可能不是你真正想要的?用戶每次啓動應用程序只能保存一次嗎?似乎不太可能。

取而代之的是,您需要通過將此邏輯封裝在mergeMap之內來保持頂級流式史詩般的返回活動並監聽未來的操作。該take(4)並傳遞data.id則成爲多餘:

const saveEpic = (action$, store) => 
    action$.ofType(SAVE) 
    .mergeMap(data => 
     Rx.Observable.from([ 
     Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)), 
     Rx.Observable.timer(1000), 
     Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)) 
     ]) 
     .concatAll() 
     .toArray() 
     .map(() => ({ type: COMPLETED, id: data.id })) 
    ); 

流的這種分離是由本·萊什在他最近AngularConnect會談所描述的,在錯誤的情況下,但它仍然適用:https://youtu.be/3LKMwkuK0ZE?t=20m(不用擔心,這是不是角具體)

接下來,我想分享一些不請自來的重構建議,可以讓你的生活更容易,但可以肯定這是自以爲是可以隨意忽略:

我會重構更準確地反映視覺上的事件順序,以及降低複雜性:

const saveEpic = (action$, store) => 
    action$.ofType(SAVE) 
    .mergeMap(data => 
     Rx.Observable.from(saveClock(data.id, data.clock)) 
     .delay(1000) 
     .mergeMap(() => saveConfig(data.id, data.config)) 
     .map(() => ({ type: COMPLETED, id: data.id })) 
    ); 

在這裏,我們就吃下由saveClock返回的承諾,延緩它是1000毫秒輸出,該mergeMapping結果向saveConfig()呼叫也返回將被消耗的承諾。然後最終將結果映射到我們的COMPLETE操作。

最後,請記住,如果你的史詩確實活着,並長期生活,沒有什麼在這個史詩般的,是從接收多個SAVE請求停止,而其他的人仍然在飛行中還是有不但耗盡了請求之間所需的1000ms延遲。即如果確實需要任何請求之間的1000ms空間,那麼您的史詩本身並不會完全阻止您的UI代碼破壞它。在這種情況下,您可能需要考慮添加更復雜的緩衝機制,例如使用.zip()運算符和BehaviorSubject

http://jsbin.com/waqipol/edit?js,output

const saveEpic = (action$, store) => { 
    // used to control how many we want to take, 
    // the rest will be buffered by .zip() 
    const requestCount$ = new Rx.BehaviorSubject(1) 
    .mergeMap(count => new Array(count)); 

    return action$.ofType(SAVE) 
    .zip(requestCount$, action => action) 
    .mergeMap(data => 
     Rx.Observable.from(saveClock(data.id, data.clock)) 
     .delay(1000) 
     .mergeMap(() => saveConfig(data.id, data.config)) 
     .map(() => ({ type: COMPLETED, id: data.id })) 
     // we're ready to take the next one, when available 
     .do(() => requestCount$.next(1)) 
    ); 
}; 

這使得它,這樣的請求,以保存進來,而我們仍在處理現有的緩衝,而我們只需要其中的一個在一個時間。請記住,這是一個無限制的緩衝區 - 意味着掛起的動作隊列可能比緩衝區刷新快得多。這是不可避免的,除非你採用了有損背壓的策略,例如丟棄重疊的請求等。

如果您有其他史詩需要重複發送請求超過一秒不超過一次,您需要創建一些排序的單一監督人,爲所有史詩般的保證。

這似乎都非常複雜,但也許具有諷刺意味的是,這是很多 RxJS比傳統命令代碼更容易做到。最難的部分實際上是知道模式。

+0

對不起括號,只是我在問題代碼中的一個錯誤。不過非常感謝您指出了多筆儲蓄的問題。雖然我沒有想過一個長壽的史詩,但我確實阻止了用戶調度多個「SAVE」操作。一旦分配了'SAVE'動作,UI將不允許再次保存,直到分派'COMPLETE'動作。儘管如此,我仍然覺得背壓機制非常有趣,而且雙重檢查將非常有用。幸運的是,我沒有其他史詩來查詢設備,因此我不需要任何機制來防止這種情況。 –

+0

保持史詩般的生命似乎解決了我的問題。謝謝! –

+0

不客氣! – jayphelps

相關問題