2017-03-02 69 views
0

我需要處理大量數據。對於數據的每個條目,我們需要向MySQL提交查詢。我目前的解決辦法是像如下:帶有承諾的批量模式javascript

var Q = require('q'); 
    function process(entry){ 
     ... 
     var defered=Q.defer(); 
     connection.query(sql,defered.makeNodeResolver()); 
     return defered.promise; 
    } 
    function ProcessAll(results) { 
     var waitfor=[]; 
     for(var i=0;i< results.length;i++){ 
      waitfor.push(process(results[i])); 
     } 
     Q.all(waitfor).then(function(results) { 
      notifySuc(results); 
     },function(results){ 
      notifyFail(results); 
     }); 
    } 

然而,當數據的數量是巨大的,它會因碰撞到了內存:

FATAL ERROR: Committing semi space failed. Allocation failed - process out of memory 

1: node::Abort() [node] 
2: 0x109624c [node] 
3: v8::Utils::ReportApiFailure(char const*, char const*) [node] 
4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node] 
5: v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [node] 
6: v8::internal::Heap::CollectGarbage(v8::internal::GarbageCollector, char const*, char const*, v8::GCCallbackFlags) [node] 

如何我小批量分手呢?例如,我們每次可以處理1000個條目。他們都是一個我們恢復其餘的。如何用回調函數做到這一點?

另外,我們可以讓多個條目並行處理嗎?

+0

您確實需要多個查詢提交到數據庫?是不可能做出一個大的查詢,然後處理該查詢的結果? – hequ

+0

'我們可以讓多個條目得到並行處理嗎? - 呃......它們是並行處理的(儘可能使用javascript)......你的代碼正在觸發儘可能多的'connection.query',因爲有元素在數組中.. –

+0

我無法將多個查詢合併爲一個,因爲數據很大。如果我有100K條目,對於每個條目我需要寫1K字節的查詢,組合的100M查詢將太大而無法提交到數據庫。此外,分成小批量的主要目的,比如說每個1k,我只需要每次1K條目的內存,而不是一次加載所有100M。 –

回答

0

你可以使用像BaconJS這樣的東西來做到這一點。您可以使用Bacon.fromPromise(promise [,abort] [,eventTransformer])創建一個事件流,然後使用stream.bufferWithCount(count)將您的處理分批處理。 BaconJS是一個小型的功能反應規劃庫。要了解更多關於FRP的信息,您可以閱讀The Introduction to FRP快速入門。

0

如果錯誤是不是由於notifySuc您可以輕鬆地在同一時間處理一批如下

var Q = require('q'); 
function process(entry){ 
    ... 
    var defered=Q.defer(); 
    connection.query(sql,defered.makeNodeResolver()); 
    return defered.promise; 
} 
function processBatch(batch) { 
    return Q.all(batch.map(item => process(item))); 
} 
function ProcessAll(results) { 
    var batchSize = 1000; 
    var pos = 0; 
    var promises = Q([]); // initial promise of empty array 
    while (pos < results.length) { 
     (function(pos) { 
      promises = promises.then(result => processBatch(results.slice(pos, batchSize)).then(results => result.concat(results))); 
     })(pos); 
     pos += batchSize; 
    } 
    promises.then(function(results) { 
     notifySuc(results); 
    },function(results){ 
     notifyFail(results); 
    }); 
} 
+0

看起來,當results.slice同步調用時,pos已經達到results.length。因此processBatch總是傳遞一個空數組。 –

+0

ahhh poop ...忘了我的IIFE:p –