我有一個簡單的程序,它讀取一個包含數百萬行的大文件,解析每行(numpy array
)並轉換爲雙精度數組(python array
),然後寫入hdf5 file
。我多次重複這個循環。讀完每個文件後,我刪除所有對象並調用垃圾收集器。當我運行程序時,第一天解析沒有任何錯誤,但在第二天,我得到MemoryError
。我監控了我的程序的內存使用情況,在解析的第一天,內存使用量約爲1.5 GB。當第一天解析完成後,內存使用率降至50 MB。現在,當第二天開始,我試圖從文件中讀取我得到的MemoryError
。以下是該計劃的輸出。Python垃圾收集器的問題?
source file extracted at C:\rfadump\au\2012.08.07.txt
parsing started
current time: 2012-09-16 22:40:16.829000
500000 lines parsed
1000000 lines parsed
1500000 lines parsed
2000000 lines parsed
2500000 lines parsed
3000000 lines parsed
3500000 lines parsed
4000000 lines parsed
4500000 lines parsed
5000000 lines parsed
parsing done.
end time is 2012-09-16 23:34:19.931000
total time elapsed 0:54:03.102000
repacking file
done
> s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles()
-> while single_date <= self.end_date:
(Pdb) c
*** 2012-08-08 ***
source file extracted at C:\rfadump\au\2012.08.08.txt
cought an exception while generating file for day 2012-08-08.
Traceback (most recent call last):
File "rfaDumpToHDF.py", line 175, in generateFile
lines = self.rawfile.read().split('|\n')
MemoryError
我非常肯定的Windows系統的任務管理器顯示內存使用50 MB這個過程。它看起來像Python的垃圾收集器或內存管理器沒有正確計算可用內存。應該有很多可用的內存,但它認爲沒有足夠的內存。
有什麼想法?
編輯
添加我的代碼在這裏
我會把我的代碼部分。我是python的新手,請原諒我的python編碼風格。
模塊1
def generateFile(self, current_date):
try:
print "*** %s ***" % current_date.strftime("%Y-%m-%d")
weekday=current_date.weekday()
if weekday >= 5:
print "skipping weekend"
return
self.taqdb = taqDB(self.index, self.offset)
cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5"))
outputFile = config.hdf5.filePath(self.index, date=current_date)
print "cache file: ", cache_filename
print "output file: ", outputFile
tempdir = "C:\\rfadump\\"+self.region+"\\"
input_filename = tempdir + filename
print "source file extracted at %s " % input_filename
## universe
reader = rfaTextToTAQ.rfaTextToTAQ(self.tickobj) ## PARSER
count = 0
self.rawfile = open(input_filename, 'r')
lines = self.rawfile.read().split('|\n')
total_lines = len(lines)
self.rawfile.close()
del self.rawfile
print "parsing started"
start_time = dt.datetime.now()
print "current time: %s" % start_time
#while(len(lines) > 0):
while(count < total_lines):
#line = lines.pop(0) ## This slows down processing
result = reader.parseline(lines[count]+"|")
count += 1
if(count % 500000 == 0):
print "%d lines parsed" %(count)
if(result == None):
continue
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = result
if(len(levelsUpdated) == 0 and tradeupdate == False):
continue
self.taqdb.insert(result)
## write to hdf5 TODO
writer = h5Writer.h5Writer(cache_filename, self.tickobj)
writer.write(self.taqdb.groups)
writer.close()
del lines
del self.taqdb, self.tickobj
##########################################################
print "parsing done."
end_time = dt.datetime.now()
print "end time is %s" % end_time
print "total time elapsed %s" % (end_time - start_time)
defragger = hdf.HDF5Defragmenter()
defragger.Defrag(cache_filename,outputFile)
del defragger
print "done"
gc.collect(2)
except:
print "cought an exception while generating file for day %s." % current_date.strftime("%Y-%m-%d")
tb = traceback.format_exc()
print tb
模塊2 - taqdb - 存儲在數組中解析的數據
class taqDB:
def __init__(self, index, offset):
self.index = index
self.tickcfg = config.hdf5.getTickConfig(index)
self.offset = offset
self.groups = {}
def getGroup(self,ric):
if (self.groups.has_key(ric) == False):
self.groups[ric] = {}
return self.groups[ric]
def getOrderbookArray(self, ric, group):
datasetname = orderBookName
prodtype = self.tickcfg.getProdType(ric)
if(prodtype == ProdType.INDEX):
return
orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype)
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
orderbookArray = self.tickcfg.getOrderBookArray(prodtype)
return orderbookArray
else:
orderbookArray = group[datasetname]
if(len(orderbookArray) == 0):
return self.tickcfg.getOrderBookArray(prodtype)
lastOrderbook = orderbookArray[-orderbookArrayShape[1]:]
return np.array([lastOrderbook])
def addToDataset(self, group, datasetname, timestamp, arr):
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
arr[0,0]=timestamp
a1 = group[datasetname]
a1.extend(arr[0])
def addToOrderBook(self, group, timestamp, arr):
self.addToDataset(self, group, orderBookName, timestamp, arr)
def insert(self, data):
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data
delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000))
timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset
## write to array
group = self.getGroup(ric)
orderbookUpdate = False
orderbookArray = self.getOrderbookArray(ric, group)
nonzero = quotes.nonzero()
orderbookArray[nonzero] = quotes[nonzero]
if(np.any(nonzero)):
self.addToDataset(group, orderBookName, timestamp, orderbookArray)
if(tradeupdate == True):
self.addToDataset(group, tradeName, timestamp, trades)
模塊3-分析器
class rfaTextToTAQ:
"""RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs."""
def __init__(self,tickconfig):
self.tickconfig = tickconfig
self.token = ''
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.quotes = np.array([]) # read from tickconfig
self.trades = np.array([]) # read from tickconfig
self.prodtype = ProdType.STOCK
self.allquotes = {}
self.alltrades = {}
self.acvol = 0
self.levelsUpdated = []
self.quoteUpdate = False
self.tradeUpdate = False
self.depth = 0
def updateLevel(self, index):
if(self.levelsUpdated.__contains__(index) == False):
self.levelsUpdated.append(index)
def updateQuote(self, fidindex, field):
self.value = float(self.value)
if(self.depth == 1):
index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1))
self.quotes[index[0]][fidindex[1][0]] = self.value
self.updateLevel(index[0])
else:
self.quotes[fidindex] = self.value
self.updateLevel(fidindex[0][0])
self.quoteUpdate = True
def updateTrade(self, fidindex, field):
#self.value = float(self.value)
if(self.tickconfig.tradeUpdate(self.depth) == False):
return
newacvol = float(self.value)
if(field == acvol):
if(self.value > self.acvol):
tradesize = newacvol - self.acvol
self.acvol = newacvol
self.trades[fidindex] = tradesize
if(self.trades.__contains__(0) == False):
self.tradeUpdate = True
else:
self.trades[fidindex] = self.value
if(not (self.trades[0,1]==0 or self.trades[0,2]==0)):
self.tradeUpdate = True
def updateResult(self):
field = ''
valid, field = field_dict.FIDToField(int(self.fid), field)
if(valid == False):
return
if(self.value == '0'):
return
if(self.prodtype == ProdType.STOCK):
fidindex = np.where(self.tickconfig.stkQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.stkTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
else:
fidindex = np.where(self.tickconfig.futQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.futTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
def getOrderBookTrade(self):
if (self.allquotes.has_key(self.ric) == False):
acvol = 0
self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype)
trades = self.tickconfig.getTradesArray()
self.alltrades[self.ric] = [trades, acvol]
return self.allquotes[self.ric], self.alltrades[self.ric]
def parseline(self, line):
self.tradeUpdate = False
self.levelsUpdated = []
pos = 0
length = len(line)
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.token = ''
ch = ''
while(pos < length):
prevChar = ch
ch = line[pos]
pos += 1
#SEQ_NUM
if(self.state == ReadState.SEQ_NUM):
if(ch != ','):
self.token += ch
else:
self.seq_num = int(self.token)
self.state = ReadState.TIMESTAMP
self.token = ''
# TIMESTAMP
elif(self.state == ReadState.TIMESTAMP):
if(ch == ' '):
self.token = ''
elif(ch != ','):
self.token += ch
else:
if(len(self.token) != 12):
print "Invalid timestamp format. %s. skipping line.\n", self.token
self.state = ReadState.SKIPLINE
else:
self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f')
self.state = ReadState.RIC
self.token = ''
# RIC
elif(self.state == ReadState.RIC):
if(ch != ','):
self.token += ch
else:
self.ric = self.token
self.token = ''
self.ric, self.depth = self.tickconfig.replaceRic(self.ric)
self.prodtype = self.tickconfig.getProdType(self.ric)
if(self.tickconfig.subscribed(self.ric)):
self.state = ReadState.UPDATE_TYPE
self.quotes, trades = self.getOrderBookTrade()
self.trades = trades[0]
self.acvol = trades[1]
else:
self.state = ReadState.SKIPLINE
# UPDATE_TYPE
elif(self.state == ReadState.UPDATE_TYPE):
if(ch != '|'):
self.token += ch
else:
self.update_type = self.token
self.token = ''
self.state = ReadState.FVPAIRS
#SKIPLINE
elif(self.state == ReadState.SKIPLINE):
return None
# FV PAIRS
elif(self.state == ReadState.FVPAIRS):
# FID
if(self.fvstate == fvstate.FID):
if(ch != ','):
if(ch.isdigit() == False):
self.token = self.value+ch
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
else:
self.token += ch
else:
self.fid = self.token
self.token = ''
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
# FIDVALUE
elif(self.fvstate == fvstate.FIDVALUE):
if(ch != '|'):
self.token += ch
else:
self.value = self.token
self.token = ''
self.state = ReadState.FVPAIRS
self.fvstate = fvstate.FID
# TODO set value
self.updateResult()
return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate
謝謝。
我們可以看到你的代碼嗎? –
我已經添加了代碼的主要部分。我希望它有幫助。 – Alok
歡迎任何有關代碼的建議。碎片整理對象是一個C++模塊,它被swigged然後在python中調用。 – Alok