我試圖設置一個簡單的文檔進紙器來對ElasticSearch進行基準測試,並且我選擇了NodeJS,因爲我認爲使用簡單的JSON結構是最簡單的。不幸的是,我似乎在向自己射擊。構造NodeJS異步代碼,以提高內存效率
下面是相關位:
var logResults = function (results) {
docsIndexed++;
var now = +new Date();
if (docsIndexed % 10000 === 0) {
log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms");
}
}
var submitDocument = function (source, index, type) {
var doc = ejs.Document(index, type).source(source);
doc.doIndex(logResults);
}
var schemas = {};
_(10).times(function (idx) {
schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10);
});
var docCount = 0, docsIndexed = 0, start = +new Date();
Object.keys(schemas).forEach(function (schemaName, idx) {
var register = function() {
submitDocument(generateRandomDocument(schemas[schemaName]),
'documents', schemaName);
docCount++;
};
_((idx + 1) * 1000).times(register);
log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
這工作正常的高達10萬條記錄的數據集,但如果我要對數百萬在我身上吹了一個內存不足的錯誤。
中的doIndex
函數是異步的,我懷疑許多對象在實際執行之前正在排隊。當這個數字變得重要時,這個過程就會消失。我不明白爲什麼在循環結束之前沒有執行回調。我想要的是一種使其同步的方法,或者爲其設置某種類型的池,以便在發送其他對象之前不排隊更多的對象。
有人可以請建議一個圖書館,可以幫助這個或更好的方式來構建代碼?謝謝。
更新
我已經試過彼得的建議,使用async.queue,我想出了這一點:
/** Submit QUANT * (idx + 1) documents for each schema into the index */
var QUANT = 100
, docCount = 0
, docsIndexed = 0
, queue = async.queue(submitDocument, 1000)
, paused = false
, start = +new Date();
queue.saturated = function() {
log("queue is full");
paused = true;
};
queue.empty = function() {
log("All items were given to workers");
paused = false;
};
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0;
while (count < (idx + 1) * QUANT) {
if (!paused) {
queue.push({
source: generateRandomDocument(schemas[schemaName]),
index: 'documents',
type: schemaName
});
count++; docCount++;
}
};
log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
如果它得到循環暫停,它掛起永遠(即調用queue.saturated
,暫停設置爲true,然後程序停留在while循環中)。 queue.empty callback
永遠不會被調用。如果我的隊列併發限制高於我想要處理的數字,則這可以正常工作 - 所有消息都按預期記錄。我應該在這裏改變什麼?
更新#2
我已經改變了使用異步循環的代碼,現在它的工作原理。我得到了一個RangeError: Maximum call stack size exceeded
錯誤,我一直在努力。
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0, executions = 0;
async.whilst(
function() {
var test = count < (idx + 1) * QUANT;
if (!test) log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Executions: " + executions + ". Total: " + docCount);
return test;
},
function (callback) {
executions++;
if (!paused) {
queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName });
count++; docCount++;
}
setTimeout(callback, 0);
// also tried with "return process.nextTick(callback)"
// and return callback();
// this blows up nicely with an out of memory error
},
function (err) {}
);
});
我開始感到沮喪,因爲我不認爲這個用例真的很複雜,我希望我對語言的工作原理有一個公正的理解。
謝謝,我會試試看。 –