2016-02-18 46 views
0

我有一個流的NodeJS管道就是有點像這樣:推動對象(或「錯誤:EOF後stream.push()」)

source.pipe(bulkElasticSearchLookup).pipe(doWork).pipe(destination) 

bulkElasticSearchLookup的目的是收集傳入的對象,當它有10個時,在Elasticsearch中對全部10個進行批量搜索,然後進行10次單獨的this.push()調用以將項目轉換爲doWork變換。

這將我對Elasticsearch的調用次數減少了10倍,使我的管道線路快了約4倍。

下面是代碼:

EsBulkLookup.prototype._transform = function (chunk, encoding, callback) { 

    var self = this; 

    self.queue.push(chunk); 
    if (self.queue.length === self.bulkSize) { 
     var lookups = self._doLookups(); 
     self._pushAll(lookups); 
     self.queue = []; 
    } 
    callback() 
}; 

EsBulkLookup.prototype._pushAll = function (items) { 
    var self = this; 
    items.forEach(function (item) { 
     self.push(item); 
    }); 
}; 

當流結束,有可能是在self.queue幾個項目,所以我想這些沖洗在管道中的下一個步驟。

我嘗試這樣做:

var self = this; 
this.on('end', function() { 
    var lookups = self._doLookups(); 
    self._pushAll(lookups); 
}); 

這一點讓我:

Error: stream.push() after EOF 

如何從我沖洗項目轉換到下一階段的管道時,流結束?

回答

0

我的問題文字包含一條線索:「flush」。

https://nodejs.org/api/stream.html#stream_transform_flush_callback

transform._flush(callback)

callback Function Call this function (optionally with an error argument) when you are done flushing any remaining data.

EsBulkLookup.prototype._flush = function (callback) { 
    var lookups = this._doLookups(); 
    this._pushAll(lookups); 
};