2015-10-16 66 views
1

我正在使用SendGrid通過電子郵件接收文件。 SendGrid解析傳入的電子郵件,並以多部分形式將文件發送到我設置的端點。如何在以多部分形式處理文件流之前等待字段

我不想讓我的本地磁盤上的文件,所以我直接流到Amazon S3。這工作完美。

但是,在我可以流到S3之前,我需要掌握目標郵件地址,這樣我才能制定出正確的s3文件夾。這是在表單帖子中的名爲「to」的字段中發送的。不幸的是,這個字段有時會在文件到達後到達,因此我需要一種方法在我準備好接收流之前等待字段。

我以爲我可以將onField封裝在承諾中,並等待onFile中的to-field。但是這個概念似乎在字段到達文件後自動鎖定它。

我是新來的攤位流和承諾。如果有人能告訴我如何做到這一點,我將非常感激。

這是不工作pseudoish代碼:

function sendGridUpload(req, res, next) { 
    var busboy = new Busboy({ headers: req.headers }); 

    var awaitEmailAddress = new Promise(function(resolve, reject) { 
    busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) { 
     if(fieldname === 'to') { 
     resolve(val); 
     } else { 
     return; 
     } 
    }); 
    }); 


    busboy.on('file', function(fieldname, file, filename, encoding, mimetype) { 

    function findInbox(emailAddress) { 
     console.log('Got email address: ' + emailAddress); 

     ..find the inbox and generate an s3Key 
     return s3Key; 
    } 

    function saveFileStream(s3Key) { 
     ..pipe the file directly to S3 
    } 

    awaitEmailAddress.then(findInbox) 
    .then(saveFileStream) 
    .catch(function(err) { 
     log.error(err) 
    }); 
    }); 

    req.pipe(busboy); 
} 
+0

* facepalm *在重寫的某個階段,我設法重寫onData事件。如果沒有onData,則不會有數據流。重新啓動onData事件後,它實際上似乎工作。但我會真正感興趣的人更有知識的人的意見。這是一個好的解決方案嗎? – Michael

+0

另外..通過這個解決方案,我正在裝載流中的內部緩衝區,直到我得到電子郵件地址並將流釋放到S3。經過一些測試後,似乎是100k到幾MB的情侶。內部流緩衝區能夠增長多少而不會讓我陷入困境? – Michael

回答

0

我終於得到了這個工作。解決方案不是很漂亮,而且我實際上轉向了另一個概念(在帖子結尾處描述)。

緩衝輸入數據,直到「到」字段到達我用@samcday使用流緩衝區。當我掌握到現場時,我將可讀流釋放到排列成數據的管道。

這裏是代碼(有些部分省略,但基本部分在那裏)。

var streamBuffers = require('stream-buffers'); 

function postInboundMail(req, res, next) { 
    var busboy = new Busboy({ headers: req.headers}); 

    //Sometimes the fields arrives after the files are streamed. 
    //We need the "to"-field before we are ready for the files 
    //Therefore the onField is wrapped in a promise which gets 
    //resolved when the to field arrives 
    var awaitEmailAddress = new Promise(function(resolve, reject) { 
    busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) { 
     var emailAddress; 

     if(fieldname === 'to') { 
     try { 
      emailAddress = emailRegexp.exec(val)[1] 
      resolve(emailAddress) 
     } catch(err) { 
      return reject(err);   
     } 
     } else { 
     return; 
     } 
    }); 
    }); 


    busboy.on('file', function(fieldname, file, filename, encoding, mimetype) { 
    var inbox; 

    //I'm using readableStreamBuffer to accumulate the data before 
    //I get the email field so I can send the stream through to S3 
    var readBuf = new streamBuffers.ReadableStreamBuffer(); 

    //I have to pause readBuf immediately. Otherwise stream-buffers starts 
    //sending as soon as I put data in in with put(). 
    readBuf.pause(); 

    function getInbox(emailAddress) { 
     return model.inbox.findOne({email: emailAddress}) 
     .then(function(result) { 
     if(!result) return Promise.reject(new Error(`Inbox not found for ${emailAddress}`)) 

     inbox = result; 
     return Promise.resolve(); 
     }); 
    } 

    function saveFileStream() { 
     console.log('=========== starting stream to S3 ========= ' + filename) 

     //Have to resume readBuf since we paused it before 
     readBuf.resume(); 

     //file.save will approximately do the following: 
     // readBuf.pipe(gzip).pipe(encrypt).pipe(S3) 
     return model.file.save({ 
     inbox: inbox, 
     fileStream: readBuf 
     }); 
    } 

    awaitEmailAddress.then(getInbox) 
    .then(saveFileStream) 
    .catch(function(err) { 
     log.error(err) 
    }); 


    file.on('data', function(data) { 
     //Fill readBuf with data as it arrives 
     readBuf.put(data); 
    }); 

    file.on('end', function() { 
     //This was the only way I found to get the S3 streaming finished. 
     //Destroysoon will let the pipes finish the reading bot no more writes are allowed 
     readBuf.destroySoon() 
    }); 
    }); 


    busboy.on('finish', function() { 
    res.writeHead(202, { Connection: 'close', Location: '/' }); 
    res.end(); 
    }); 

    req.pipe(busboy); 
} 

我真的很喜歡這個解決方案的反饋,即使我沒有使用它。我有一種感覺,這可以做得更簡單和優雅。

新的解決方案: 而不是等待到現場我直接發送流到S3。我想,在輸入流和S3保存之間放置的東西越多,由於代碼中的錯誤,丟失傳入文件的風險就越高。 (SendGrid最終將重新發送該文件,如果我不跟200響應,但它需要一定的時間。)

這是我要做的事:

  1. 保存在數據庫中的文件的佔位符
  2. 管道流至S3
  3. 更新更多信息佔位符到達

該解決方案還使我有機會,因爲很容易獲得成功上傳的保持不成功的上傳佔位符將不完整。

// Michael

相關問題