我正在嘗試向torch-dataframe添加一個並行數據採集器,以便添加torchnet compatibility。我用tnt.ParallelDatasetIterator和changed it使:torch.serialize使用torch.serialize時出現內存不足的問題
- 基本批量加載線程
- 批次串行化之外,發送到線程
- 在線程批反序列化和轉換批量數據到張量
- 張量返回的表格中有
input
和target
鍵以匹配tnt.Engine設置。
問題發生第二次enque
被稱爲一個錯誤:.../torch_distro/install/bin/luajit: not enough memory
。我目前只使用mnist與改編的mnist-example。該enque
循環現在看起來像這樣(與調試內存輸出):
-- `samplePlaceholder` stands in for samples which have been
-- filtered out by the `filter` function
local samplePlaceholder = {}
-- The enque does the main loop
local idx = 1
local function enqueue()
while idx <= size and threads:acceptsjob() do
local batch, reset = self.dataset:get_batch(batch_size)
if (reset) then
idx = size + 1
else
idx = idx + 1
end
if (batch) then
local serialized_batch = torch.serialize(batch)
-- In the parallel section only the to_tensor is run in parallel
-- this should though be the computationally expensive operation
threads:addjob(
function(argList)
io.stderr:write("\n Start");
io.stderr:write("\n 1: " ..tostring(collectgarbage("count")))
local origIdx, serialized_batch, samplePlaceholder = unpack(argList)
io.stderr:write("\n 2: " ..tostring(collectgarbage("count")))
local batch = torch.deserialize(serialized_batch)
serialized_batch = nil
collectgarbage()
collectgarbage()
io.stderr:write("\n 3: " .. tostring(collectgarbage("count")))
batch = transform(batch)
io.stderr:write("\n 4: " .. tostring(collectgarbage("count")))
local sample = samplePlaceholder
if (filter(batch)) then
sample = {}
sample.input, sample.target = batch:to_tensor()
end
io.stderr:write("\n 5: " ..tostring(collectgarbage("count")))
collectgarbage()
collectgarbage()
io.stderr:write("\n 6: " ..tostring(collectgarbage("count")))
io.stderr:write("\n End \n");
return {
sample,
origIdx
}
end,
function(argList)
sample, sampleOrigIdx = unpack(argList)
end,
{idx, serialized_batch, samplePlaceholder}
)
end
end
end
我撒collectgarbage
並試圖刪除不需要的任何對象。存儲器輸出是相當直截了當:
Start
1: 374840.87695312
2: 374840.94433594
3: 372023.79101562
4: 372023.85839844
5: 372075.41308594
6: 372023.73632812
End
該循環的enque
功能是所述非有序功能是微不足道的(在存儲器錯誤在第二enque
拋出和):
iterFunction = function()
while threads:hasjob() do
enqueue()
threads:dojob()
if threads:haserror() then
threads:synchronize()
end
enqueue()
if table.exact_length(sample) > 0 then
return sample
end
end
end