2016-02-17 63 views
0

我從Busboy獲取文件流,然後將其管道傳輸到自定義轉換流以驗證並清除它。它適用於小文件,但隨着它們變大,我的自定義流不會等待busboy流完成並被截斷。節點轉換流不等待

這裏是打雜代碼:

busboy 
.on("file", function(fieldname, file, filename, encoding, mimetype) { 
    //Creating a mongo doc first 
    Dataset.create(dataset, function (err, ds) { 
     if(err) {...} 
     else { 
      file.pipe(validateCSV)); 
     } 
    }); 

    validateCSV 
     .on("finish", function() { 
      // Send to Data Import 
      datasetService.import(validateCSV, dataset, function (err, result) { 
       ... 
      }); 
     }); 
}); 

我的變換流:

module.exports.ValidateCSV = ValidateCSV; 
function ValidateCSV(options) { 
    if (!(this instanceof ValidateCSV)) return new ValidateCSV(options); 

    if (!options) options = {}; 
    options.objectMode = true; 
    Transform.call(this, options); 
} 

util.inherits(ValidateCSV, Transform); 

ValidateCSV.prototype._transform = function (chunk, encoding, done) { 
    if (this._checked) { 
     this.push(chunk); 
    } else { 
     //Do some validation 
     var data = chunk.toString(); 
     var lines = data.match(/[^\r\n]+/g); 
     var headerline = lines[0] || ""; 
     var header = headerline.split(","); 
     ... 
     this._checked = true; 
     this.push(chunk); 
    } 
    done() 
} 

回答

0

原來,這是一個背壓問題,seeting固定它變換流HighWaterMark選項。理想情況下,它根據上傳的文件大小設置,但是這對我來說是固定的:

function ValidateCSV(options) { 
    if (!(this instanceof ValidateCSV)) return new ValidateCSV(options); 

    if (!options) options = {}; 
    options.objectMode = true; 
    options.highWaterMark = 100000; 
    Transform.call(this, options); 
}