2014-02-13 32 views
4

給定一個Readable流(其可以是或process.stdin文件流),是有可能/實用pipe()到自定義Writable流,將填補兒童Writable直到一定的大小;然後關閉該子流;打開新的Writable流並繼續?的Node.js:分割一個可讀流管到多個順序寫流

(上下文是一大塊數據的從管道到CDN上傳,將其分割成,因爲它去一個合理尺寸的塊,而不必首先將數據寫入到磁盤。)

我試過創建一個處理_write函數中的子流的打開和關閉的Writable,但是當傳入的塊太大而不適合現有的子流時會出現問題:它必須將一些塊寫入舊溪流;創建新的流;然後在完成_write呼叫之前等待新流上的open事件。

另外我想過創建一個額外的DuplexTransform流緩衝管,並確保該塊竄進Writable肯定比現有的子流可以接受的金額等於或小於,給予Writable時間更改子流。

或者,這是否過度複雜,並且有一個更簡單的方法來完成原始任務?

回答

1

我會在ReadableWritable之間引入一個Transform流。在其_transform中,我會盡我所能的所有邏輯。

也許,我只會有一個Readable和一個變換。 _transform方法將創建我需要的所有可寫入流

就個人而言,我只在使用可寫入流時纔將數據轉儲到某處,並且我將處理該塊。
我儘可能避免實施_read_write並濫用轉換流。

但是,我不明白你的問題是關於尺寸的寫法。你是什​​麼意思?

+0

我們的目標是不斷讀取數據,從可讀流直到1GB傳遞到可寫流。一旦達到1GB,目標'可寫'流需要被關閉,另一個被打開,然後該過程繼續,從它從中斷的地方讀取相同的'可讀'流。 由於數據高達5GB(CDN的限制)並且涉及的雲服務器磁盤和內存空間有限,因此我不想緩衝它,然後打開Writable:它需要穩定的流量。 – gid

2

在尋找相關問題的答案時,我碰到了問題。如何解析文件並根據行中的某個類別值將它的行分割成單獨的文件。

我盡最大努力改變我的代碼,使其與您的問題更相關。但是,這很快就適應了。未經測試。將其視爲僞代碼。

var fs = require('fs'), 
    through = require('through'); 

var destCount = 0, dest, size = 0, MAX_SIZE = 1000; 

readableStream 
    .on('data', function(data) { 
    var out = data.toString() + "\n"; 
    size += out.length; 
    if(size > MAX_SIZE) { 
     dest.emit("end"); 
     dest = null; 
     size = 0; 
    } 
    if(!dest) { 
     // option 1. manipulate data before saving them. 
     dest = through(); 
     dest.pipe(fs.createWriteStream("log" + destCount)) 
     // option 2. write directly to file 
     // dest = fs.createWriteStream("log" + destCount); 
    } 
    dest.emit("data", out); 
    }) 
    .on('end', function() { 
    dest.emit('end'); 
    });