2014-01-24 79 views
5

考慮下面的代碼...我想讀第5行後暫停流:暫停的readline

var fs   = require('fs'); 
var readline = require('readline'); 
var stream  = require('stream'); 
var numlines = 0; 
var instream = fs.createReadStream("myfile.json"); 
var outstream = new stream; 
var readStream = readline.createInterface(instream, outstream); 
readStream.on('line', function(line){ 
    numlines++; 
    console.log("Read " + numlines + " lines"); 
    if (numlines >= 5) { 
    console.log("Pausing stream"); 
    readStream.pause(); 
    } 
}); 

輸出(下一個複製)表明,它使閱讀後線暫停。也許readline已經在緩衝區中排隊了幾行,並將它們反饋給我......如果它繼續在後臺異步讀取,這將是有意義的,但基於文檔,我不知道適當的行爲應該是。有關如何達到預期效果的任何建議?

Read 1 lines 
Read 2 lines 
Read 3 lines 
Read 4 lines 
Read 5 lines 
Pausing stream 
Read 6 lines 
Pausing stream 
Read 7 lines 

回答

6

因此,事實證明,即使在暫停()之後,readline流也傾向於「滴漏」(即,泄漏一些額外的行)。文件沒有說清楚,但這是事實。

如果您希望暫停()切換立即出現,您必須創建自己的行緩衝區並自己累積剩餘行。

8

有點unintuitively,the pause methods does not stop queued up line events

調用rl.pause()不會立即從由readline.Interface實例被髮射暫停其它事件(包括'line')。

然而,有一個名爲line-by-line其中pause確實暫停line事件,直到它恢復第三方模塊。

var LineByLineReader = require('line-by-line'), 
    lr = new LineByLineReader('big_file.txt'); 

lr.on('error', function (err) { 
    // 'err' contains error object 
}); 

lr.on('line', function (line) { 
    // pause emitting of lines... 
    lr.pause(); 

    // ...do your asynchronous line processing.. 
    setTimeout(function() { 

     // ...and continue emitting lines. 
     lr.resume(); 
    }, 100); 
}); 

lr.on('end', function() { 
    // All lines are read, file is closed now. 
}); 

(我有一個模塊沒有隸屬關係,只是覺得處理這個問題非常有用。)

+0

感謝您的回答。出於興趣,這種要求有多普遍?我正在解析需要流式傳輸到服務器的80GB CSV。還有什麼其他用例? –

+1

@ZachSmith我發現在回調不能或不應該同步完成時(比如說,將行插入到數據庫中),可以暫停和恢復。如果您正在讀取的行比處理它們的速度快,則可能會排隊過多請求並耗盡內存。 –

0

加點分:

.on('pause', function() { 
    console.log(numlines) 
}) 

您將得到5.它在node.js document中提到:

  • 輸入流未暫停收到 SIGCONT事件。 (參見事件SIGTSTP和SIGCONT)

所以,我在行事件中創建了一個tmp緩衝區。使用標誌來確定它是否被觸發暫停。

.on('line', function(line) { 
    if (paused) { 
     putLineInBulkTmp(line); 
    } else { 
     putLineInBulk(line); 
    } 
} 

然後在上暫停和恢復:

.on('pause', function() { 
    paused = true; 
    doSomething(bulk, function(resp) { 
     // clean up bulk for the next. 
     bulk = []; 
     // clone tmp buffer. 
     bulk = clone(bulktmp); 
     bulktmp = []; 
     lr.resume(); 
    }); 
}) 
.on('resume',() => { 
    paused = false; 
}) 

用這種方式來處理這種情況。