4
我試圖從generate
Observable
從SQL數據庫的一系列(批量)記錄中,我試圖運行數據庫中的所有記錄。我在node-js上使用了ORM,Sequelize
它返回包裹在promise中的記錄。從ORM返回的一系列Promise中產生一個RxJS Observable
我已經定義了一個函數,其fetchbatch()
讀取下一批次,並返回一個Promise[Array[Record]]
和flatMap
「荷蘭國際集團的結果到一個Observable
。
根據查詢是否沒有返回記錄,我的條件(終止)在承諾的then塊中被設置爲全局的,但回調從不被調用,只有promise被無限地返回,因此終止條件永遠不會滿意。有關如何處理這個問題的任何建議?這是代碼的要點。
function getAllPaginated(conditions) {
var remaining = true;
var batch_size = 20;
function condition(){ return remaining; }
function selector(promisedBatchOfRecords){
//console.log(promisedBatchOfRecords);
//return Observable.fromPromise(promisedBatchOfRecords[1]);
return (promisedBatchOfRecords[1]);
}
function fetchBatch(batchNumberAndBatch) { // Returns [NextBatchNumber, Promise[Array[Record]]]
//console.log(remaining);
var batch_number = batchNumberAndBatch[0];
var offset = (batch_number - 1) * batch_size;
var rs = Records.findAll({where: conditions, offset: offset, limit: batch_size});
return [batch_number + 1,
rs.then(function(batch) {
console.log(batch.length);
if (!(batch.length > 0)){
remaining = false;
};
return batch.map(function(r){r.dataValues});
})];
}
return Observable.generate(fetchBatch([1, []]), condition, fetchBatch).flatMap(Ramda.identity/*over the promise*/).flatMap(Ramda.identity/*over the list*/);
}
var o = getAllPaginated({where: {a: "b"}})
o.subScribeOnNext(console.log)
'回報batch.map(函數(R){r.dataValues});'這只是映射到所有值未定義。你錯過了一個返回或者只是一個'r => r.dataValues'。由於Sequelize使用藍鳥,你可以使用'Promise.map',這更容易。 –
感謝@BenjaminGruenbaum,剛剛進入Scala的JS世界,您是否還知道可以通過每次執行一個函數來生成''''''create'''一個Observable的方法? –
您可以執行'new Observable'並在構造函數中傳遞源代碼。 –