2013-06-19 42 views
0

我試圖設置一個簡單的文檔進紙器來對ElasticSearch進行基準測試,並且我選擇了NodeJS,因爲我認爲使用簡單的JSON結構是最簡單的。不幸的是,我似乎在向自己射擊。構造NodeJS異步代碼,以提高內存效率

下面是相關位:

var logResults = function (results) { 
    docsIndexed++; 
    var now = +new Date(); 
    if (docsIndexed % 10000 === 0) { 
     log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms"); 
    } 
    } 

    var submitDocument = function (source, index, type) { 
    var doc = ejs.Document(index, type).source(source); 
    doc.doIndex(logResults); 
    } 

    var schemas = {}; 
    _(10).times(function (idx) { 
    schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10); 
    }); 

    var docCount = 0, docsIndexed = 0, start = +new Date(); 
    Object.keys(schemas).forEach(function (schemaName, idx) { 
    var register = function() { 
     submitDocument(generateRandomDocument(schemas[schemaName]), 
     'documents', schemaName); 
     docCount++; 
    }; 
    _((idx + 1) * 1000).times(register); 
    log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema " 
     + schemaName + ". Total: " + docCount); 
    }); 

這工作正常的高達10萬條記錄的數據集,但如果我要對數百萬在我身上吹了一個內存不足的錯誤。

中的doIndex函數是異步的,我懷疑許多對象在實際執行之前正在排隊。當這個數字變得重要時,這個過程就會消失。我不明白爲什麼在循環結束之前沒有執行回調。我想要的是一種使其同步的方法,或者爲其設置某種類型的池,以便在發送其他對象之前不排隊更多的對象。

有人可以請建議一個圖書館,可以幫助這個或更好的方式來構建代碼?謝謝。


更新

我已經試過彼得的建議,使用async.queue,我想出了這一點:

/** Submit QUANT * (idx + 1) documents for each schema into the index */ 
    var QUANT = 100 
    , docCount = 0 
    , docsIndexed = 0 
    , queue = async.queue(submitDocument, 1000) 
    , paused = false 
    , start = +new Date(); 

    queue.saturated = function() { 
    log("queue is full"); 
    paused = true; 
    }; 
    queue.empty = function() { 
    log("All items were given to workers"); 
    paused = false; 
    }; 

    Object.keys(schemas).forEach(function (schemaName, idx) { 
    var count = 0; 
    while (count < (idx + 1) * QUANT) { 
     if (!paused) { 
     queue.push({ 
      source: generateRandomDocument(schemas[schemaName]), 
      index: 'documents', 
      type: schemaName 
     }); 
     count++; docCount++; 
     } 
    }; 
    log("Registered " + count + " documents for indexing for schema " 
     + schemaName + ". Total: " + docCount); 
    }); 

如果它得到循環暫停,它掛起永遠(即調用queue.saturated,暫停設置爲true,然後程序停留在while循環中)。 queue.empty callback永遠不會被調用。如果我的隊列併發限制高於我想要處理的數字,則這可以正常工作 - 所有消息都按預期記錄。我應該在這裏改變什麼?

更新#2

我已經改變了使用異步循環的代碼,現在它的工作原理。我得到了一個RangeError: Maximum call stack size exceeded錯誤,我一直在努力。

Object.keys(schemas).forEach(function (schemaName, idx) { 
    var count = 0, executions = 0; 
    async.whilst(
     function() { 
     var test = count < (idx + 1) * QUANT; 
     if (!test) log("Registered " + count + " documents for indexing for schema " 
      + schemaName + ". Executions: " + executions + ". Total: " + docCount); 
     return test; 
     }, 

     function (callback) { 
     executions++; 
     if (!paused) { 
      queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName }); 
      count++; docCount++; 
     } 
     setTimeout(callback, 0); 
     // also tried with "return process.nextTick(callback)" 
     // and return callback(); 
     // this blows up nicely with an out of memory error 
     }, 
     function (err) {} 
    ); 
    }); 

我開始感到沮喪,因爲我不認爲這個用例真的很複雜,我希望我對語言的工作原理有一個公正的理解。

回答

1

最好的選擇是async.queue有一個很大的限制。只要確保你不只是在隊列已經飽和之後就將其添加到隊列中。使用隊列的飽和度作爲背壓,等待一些工作完成,然後開始排隊更多任務。該隊列具有掛鉤以支持這些關鍵事件。

+0

謝謝,我會試試看。 –