我有一個流的NodeJS管道就是有點像這樣:推動對象(或「錯誤:EOF後stream.push()」)
source.pipe(bulkElasticSearchLookup).pipe(doWork).pipe(destination)
bulkElasticSearchLookup的目的是收集傳入的對象,當它有10個時,在Elasticsearch中對全部10個進行批量搜索,然後進行10次單獨的this.push()調用以將項目轉換爲doWork變換。
這將我對Elasticsearch的調用次數減少了10倍,使我的管道線路快了約4倍。
下面是代碼:
EsBulkLookup.prototype._transform = function (chunk, encoding, callback) {
var self = this;
self.queue.push(chunk);
if (self.queue.length === self.bulkSize) {
var lookups = self._doLookups();
self._pushAll(lookups);
self.queue = [];
}
callback()
};
EsBulkLookup.prototype._pushAll = function (items) {
var self = this;
items.forEach(function (item) {
self.push(item);
});
};
當流結束,有可能是在self.queue幾個項目,所以我想這些沖洗在管道中的下一個步驟。
我嘗試這樣做:
var self = this;
this.on('end', function() {
var lookups = self._doLookups();
self._pushAll(lookups);
});
這一點讓我:
Error: stream.push() after EOF
如何從我沖洗項目轉換到下一階段的管道時,流結束?