2016-08-21 62 views
1

我正在談論一些涉及分頁的RESTful搜索終結點。該查詢由用戶在搜索字段中鍵入而觸發,結果它產生一個具有N個值的Observable,對應於N個結果頁面。RxJS:使用switchMap生成N值的背壓

的代碼看起來有點像下面這樣:

function runQueries(queryObservable) { 
    return queryObservable 
      .debounceTime(500) 
      .distinctUntilChanged() 
      .switchMap(search); 
} 

function search(query) { 
    return Observable.create(observer => searchInto(observer, query, 0)); 
} 

function searchInto(observer, query, start) { 
    runQuery(query, start).subscribe(result => { 
    observer.next(result); 
    if (hasMorePages(result)) { 
     searchInto(observer, query, start + 1); 
    } else { 
     observer.complete(); 
    } 
    }); 
} 

現在,搜索請求可能需要一段時間,我不想要檢索的所有網頁,如果用戶改變了查詢。

假設搜索返回3個頁面,並且用戶在加載一個頁面後更改查詢。我想看到類似的東西:

USER: types query A 
CODE: loads page A1 
USER: types query B 
CODE: loads page B1 
CODE: loads page B2 
CODE: loads page B3 

switchMap完成一半的工作。由此產生的觀測值具有正確的序列:A1,B1,B2,B3。大。

但是,在幕後,我的遞歸搜索仍在運行所有查詢,從而給服務器,網絡等造成不必要的負擔。switchMap確實放棄了「過時」結果,但它並沒有阻止遞歸函數的執行工作到最後。換句話說,它看起來像:

USER: types query A 
CODE: loads page A1 -> returned by search observable 
USER: types query B 
CODE: loads page A2 -> discarded by search observable 
CODE: loads page B1 -> returned by search observable 
CODE: loads page B2 -> returned by search observable 
CODE: loads page A3 -> discarded by search observable 
CODE: loads page B3 -> returned by search observable 

「A」和「B」的順序是隨機的(受競爭條件的限制),但沒關係。

我在做什麼錯?什麼是這種慣用的解決方案?

+0

你用什麼製作的HTTP請求?在你的解決方案中,我沒有看到你在哪裏取消Observable。就像這個例子中的https://jsfiddle.net/_alexander_/r1tahjqe/3/,代碼爲'return()=> request.abort();'關於http請求 –

+0

@AlexanderT。這正是問題,我可以「取消」Observable嗎?傳遞消息的方向相反,所以遞歸生產者知道它不應該運行更多的查詢? 我正在使用Angular2 HTTP或一些自定義HTTP客戶端進行HTTP請求。我認爲這不相關。 –

回答

1

switchMap只能取消操作:生產者可以使用訂閱狀態與observer.isUnsubscribed停止其工作。由於您未在Observable.create內返回Subscription,因此無法取消正在進行的操作。

目前的情況是使用Observable.create是不是真的幫助你在這種情況下,我建議你利用expand操作來執行遞歸操作來代替:

function runQueries(queryObservable) { 
    return queryObservable 
      .debounceTime(500) 
      .distinctUntilChanged() 
      .switchMap(search); 
} 

function search(query) { 
    //Kicks off the first query 
    return runQuery(query, 0) 
     //Uses the results of the first query to see if more queries should be made 
    .expand((result, idx) => 
     //Continues to execute more queries until `hasMorePages` is false 
     hasMorePages(result) ? 
     runQuery(query, idx + 1) : 
     Observable.empty()); 
} 
1

flatMap當它啓動新的「流」時,可以退訂Observable。如果你返回一個Observable即取消

function searchInto(observer, query, start) { 
    runQuery(query, start).subscribe(result => { 
    observer.next(result); 
    if (hasMorePages(result) && !observer.isUnsubscribed) { 
     searchInto(observer, query, start + 1); 
    } else { 
     observer.complete(); 
    } 
    }); 
}