2016-01-06 82 views
4

我試圖從generateObservable從SQL數據庫的一系列(批量)記錄中,我試圖運行數據庫中的所有記錄。我在node-js上使用了ORM,Sequelize它返回包裹在promise中的記錄。從ORM返回的一系列Promise中產生一個RxJS Observable

我已經定義了一個函數,其fetchbatch()讀取下一批次,並返回一個Promise[Array[Record]]flatMap「荷蘭國際集團的結果到一個Observable

根據查詢是否沒有返回記錄,我的條件(終止)在承諾的then塊中被設置爲全局的,但回調從不被調用,只有promise被無限地返回,因此終止條件永遠不會滿意。有關如何處理這個問題的任何建議?這是代碼的要點。

function getAllPaginated(conditions) { 
    var remaining = true; 
    var batch_size = 20; 
    function condition(){ return remaining; } 
    function selector(promisedBatchOfRecords){ 
     //console.log(promisedBatchOfRecords); 
     //return Observable.fromPromise(promisedBatchOfRecords[1]); 
     return (promisedBatchOfRecords[1]); 
    } 
    function fetchBatch(batchNumberAndBatch) { // Returns [NextBatchNumber, Promise[Array[Record]]] 
     //console.log(remaining); 
     var batch_number = batchNumberAndBatch[0]; 
     var offset = (batch_number - 1) * batch_size; 
     var rs = Records.findAll({where: conditions, offset: offset, limit: batch_size}); 
     return [batch_number + 1, 
       rs.then(function(batch) { 
       console.log(batch.length); 
       if (!(batch.length > 0)){ 
        remaining = false; 
       }; 
       return batch.map(function(r){r.dataValues}); 
       })]; 
    } 
    return Observable.generate(fetchBatch([1, []]), condition, fetchBatch).flatMap(Ramda.identity/*over the promise*/).flatMap(Ramda.identity/*over the list*/); 
    } 
var o = getAllPaginated({where: {a: "b"}}) 
o.subScribeOnNext(console.log) 
+0

'回報batch.map(函數(R){r.dataValues});'這只是映射到所有值未定義。你錯過了一個返回或者只是一個'r => r.dataValues'。由於Sequelize使用藍鳥,你可以使用'Promise.map',這更容易。 –

+0

感謝@BenjaminGruenbaum,剛剛進入Scala的JS世界,您是否還知道可以通過每次執行一個函數來生成''''''create'''一個Observable的方法? –

+0

您可以執行'new Observable'並在構造函數中傳遞源代碼。 –

回答

0

你可以嘗試這樣的事情:

const result = new Rx.Subject; 
 
const batch_size = 3; 
 

 
// Init the recursion 
 
whileFind(0) 
 
    .subscribe(); 
 

 
// Grab the result here 
 
result 
 
    .mergeAll() 
 
    .map(batch => batch.dataValues) 
 
    .subscribe(value => console.log(value)); 
 
    
 
// Recursion function 
 
function whileFind(offset) { 
 
    return Rx.Observable.fromPromise(findAll(offset)) 
 
    .concatMap(batch => { 
 
     if (batch.length <= 0) { // Stop condition 
 
     return Rx.Observable.of(null); 
 
     } 
 
     else { 
 
     result.next(batch); // Push the chunk to the result 
 
     return whileFind(offset + batch_size); 
 
     } 
 
    }); 
 
} 
 

 
// Emulate Records.findAll from your BO 
 
function findAll(offset): Promise<Object[]> { 
 
    const data = [ 
 
    { dataValues: 1 }, 
 
    { dataValues: 2 }, 
 
    { dataValues: 3 }, 
 
    { dataValues: 4 }, 
 
    { dataValues: 5 }, 
 
    { dataValues: 6 }, 
 
    { dataValues: 7 }, 
 
    { dataValues: 8 }, 
 
    { dataValues: 9 }, 
 
    { dataValues: 10 } 
 
    ]; 
 

 
    return Promise.resolve(data.slice(offset, offset + batch_size)); 
 
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.12/Rx.min.js"></script>

相關問題