2017-08-16 49 views
0

我正在嘗試讀取並解析一個大的csv文件,並且對於每一行,我必須執行一些異步計算並在完成操作後增加計數器。所以我創建了一個Promise p並嘗試鏈接很多.then(xxx),並在csv讀取結束時它是最後的.then(yyy)以輸出計數。承諾中的某些可執行文件未被執行

但是這個數字沒有加起來。但如果我做p = p.then(xxx)p = p.then(yyy)這個數字會加起來(對於較小的csv文件),但有時我會面對內存泄漏(對於大型csv文件)。

有什麼我做錯了嗎?

var fs = require('fs') 
const csv = require('fast-csv'); 
var Promise = require('bluebird') 
var count = 0; 
var actual = 0; 
let p = Promise.resolve(); 
const stream = fs.createReadStream(`/Users/ssmlee/Desktop/KingKong_Sims_5M.txt`); 
const csvStream = csv({ 
    delimiter: ';' 
}) 
.on('data', (row) => { 
    count++ 
    if (count % 10000 === 0) { 
     console.log(count) 
     console.log(process.memoryUsage().heapUsed) 
    } 
    p.then(() => { // instead if we do p = p.then(() => it will work correctly 
    return Promise.resolve().delay(5) 
    .then(function() { 
     actual++ 
    }) 
    }); 
}) 
.on('end',() => { 
    p.then(() => { // instead if we do p = p.then(() => it will work correctly 
    console.log(actual); // 4999977 or something like this 
    console.log(count); // 5000000 
    }); 
}); 
stream.pipe(csvStream); 
+1

你爲什麼認爲這種方案會泄漏內存? 「p = p.then(...)'沒有任何問題。該結構本身不會導致內存泄漏。 – jfriend00

+0

我在做'process.memoryUsage()。heapUsed'來檢查我的內存使用情況,結果我的內存沒有被垃圾回收。你可能會生成一個5m行的隨機文件,看到這種情況,我想不知道爲什麼。 –

+0

由於您選擇實施此方法,因此您正在使用大量內存。您可以通過適當的排序來減少內存,而不是在同一時間將數十萬個承諾放在飛行中。例如,您可以讀取第一行,暫停CSV流,執行異步操作,然後完成釋放CSV流。如果你真的想要並行執行一些操作來加速端到端的時間,但想要合理的內存使用,那麼你需要一次保留一些適度的並行操作數(如10),而不是500,000。 – jfriend00

回答

1

如果增加actual計數和延遲,但從來沒有等待的承諾(投擲then遠的結果),流可能與並非所有的增量已經發生了結束。在你的例子中,23個回調仍然在等待5ms的延遲。順便說一句,鏈接所有這些在相同的p = Promise.resolve()沒有多大意義,你可以立即執行一切。

如果你正在做p = p.then(…)你建立一個非常長的諾言鏈。這不應該泄漏任何內存,但會佔用大量內存 - 所有5ms延遲都會按順序鏈接在一起,並且腳本將至少需要25000秒才能運行。該文件在開始時讀入,生成數百萬個承諾,然後依次解析(並且可以被垃圾收集)。

要執行此順序方法執行,您可能應該使用流的背壓系統。

但你也可以等待並行的延遲,同時具有不太多活着的承諾:

p = Promise.all([p, Promise.delay(5)]).then(() => { 
    actual++; 
}); 
+0

我的理解是,對於這兩種情況,您都有一個很長的承諾鏈,承諾在所有5m承諾解決後都會解決。在第一種情況下,'.then()'內的'('end')'是最後一個鏈接的承諾。這不是兩回事嗎? –

+1

@ Shih-MinLee不,如果你沒有改變'p',那麼所有的回調函數都附加到相同的承諾中('Promise.resolve()')。這不是一條長鏈,而是一棵樹。所有回調將立即運行(只要解析了Promise.resolve()),*不等待其他*。 – Bergi

+0

Gotcha。謝謝。 –

1

好你想要的承諾並行運行,所以不能把它們連。

allp = []; 
.... 
.on('data', (row) => { 
    ... 
    allp.push(p.then(() => {...})); 
} 
... 
.on('end',() => { 
Promise.all(allp).then(() => {}) 

當然,您正在爲每個事件創建一個Promise。

如果你需要在結束之前釋放承諾,那麼你需要自己做。

既然你似乎不感興趣的承諾的返回值,但只有在它們的副作用(增加計數),你可以做

.on('data', (row) => { 
    ... 
    if (allp.length > 50) allp = [Promise.all(allp).then(()=>null)]; 
    allp.push(p.then(() => {...})); 
} 

這樣的50個承諾將進行分組,並且一旦他們解決,他們被單個承諾取代(這將進入下一個50 ...)

.then(()=>null)確保Promise.all的結果數組也被丟棄。 (而不是一個承諾,爲空將在allp)

這不取決於Promise.all的實施。如果Promise.all解決了每個承諾(並且結果可用),那麼這是完美的。

如果Promise.all等待所有50個承諾,然後釋放他們,那麼這仍然有效,除非每個組50具有一個非常長時間運行的承諾。


您可以使用延期承諾的反模式。

在開始時創建一個延期承諾。

var resolve; 
var asyncRunningCount = 1; // start with 1 
var p2 = new Promise(function() { 
    resolve = arguments[0]; 
}); 

在上數據

.on('data', (row) => { 
    ... 
    asyncRunningCount++; 
    p.then(() => {work})) 
    .then(() { 
     asyncRunningCount--; 
     if (asyncRunningCount == 0) resolve(); // no more tasks running 
    }); 
} 

.on('end',() => { 
    asyncRunningCount--; 
    // remove the 1 that was set on start. No more new tasks will be added 
    if (asyncRunningCount == 0) resolve(); // no more tasks running 
    p2.then(() => { all done }) 

在啓動時的值1防止得到解決P2,如果正在運行的任務的計數暫時下降到0。

在上(端部)的1遞減。如果所有任務完成,asyncRunningCount將爲0.這可以通過on(end)中的遞減或on(data)中的遞減來實現。

p2.then,將在所有任務完成時運行。

所有其他承諾完成後將被釋放。事實上在(數據)你不需要承諾。剛開始你的任務異步,而當異步任務完成遞減asyncRunningCount,以及0


檢查,如果數據來自於非常快,是許多承諾的並行啓動這仍然意味着。 但是,如果你沒有開始承諾,那麼你需要存儲傳入的數據,所以內存將被用於任何方式。

+0

是的,兩種解決方案都可以工作,但爲什麼這麼複雜?在第一種方法中 - 爲什麼等待50個承諾積累在數組中,而不是每次都執行'Promise.all'?在第二種方法中 - 沒有理由使用延遲反模式,只需將整個事件發射器事件移到'new Promise'回調中。 (當然,即使那樣,它仍然是重塑'Promise.all'反模式:D) – Bergi

+0

如果你爲* every *事件做了Promise.all(除了第一個可能,因爲那時只有一個),那麼你創建兩次(一次不到兩次)的許諾,就像你沒有使用Promise.all一樣。是的,只要他們完成,他們都會被釋放,所以記憶不是問題。但執行時間可能成爲一個問題... – Martin

+0

只要它具有線性時間和內存的複雜性,我通常不會在乎:-)執行時間受到異步工作的限制(OP的示例中的'delay(5)'),額外的承諾並不重要。 – Bergi