2012-09-17 184 views
4

我有一個簡單的程序,它讀取一個包含數百萬行的大文件,解析每行(numpy array)並轉換爲雙精度數組(python array),然後寫入hdf5 file。我多次重複這個循環。讀完每個文件後,我刪除所有對象並調用垃圾收集器。當我運行程序時,第一天解析沒有任何錯誤,但在第二天,我得到MemoryError。我監控了我的程序的內存使用情況,在解析的第一天,內存使用量約爲1.5 GB。當第一天解析完成後,內存使用率降至50 MB。現在,當第二天開始,我試圖從文件中讀取我得到的MemoryError。以下是該計劃的輸出。Python垃圾收集器的問題?

source file extracted at C:\rfadump\au\2012.08.07.txt 
parsing started 
current time: 2012-09-16 22:40:16.829000 
500000 lines parsed 
1000000 lines parsed 
1500000 lines parsed 
2000000 lines parsed 
2500000 lines parsed 
3000000 lines parsed 
3500000 lines parsed 
4000000 lines parsed 
4500000 lines parsed 
5000000 lines parsed 
parsing done. 
end time is 2012-09-16 23:34:19.931000 
total time elapsed 0:54:03.102000 
repacking file 
done 
> s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles() 
-> while single_date <= self.end_date: 
(Pdb) c 
*** 2012-08-08 *** 
source file extracted at C:\rfadump\au\2012.08.08.txt 
cought an exception while generating file for day 2012-08-08. 
Traceback (most recent call last): 
    File "rfaDumpToHDF.py", line 175, in generateFile 
    lines = self.rawfile.read().split('|\n') 
MemoryError 

我非常肯定的Windows系統的任務管理器顯示內存使用50 MB這個過程。它看起來像Python的垃圾收集器或內存管理器沒有正確計算可用內存。應該有很多可用的內存,但它認爲沒有足夠的內存。

有什麼想法?

編輯

添加我的代碼在這裏

我會把我的代碼部分。我是python的新手,請原諒我的python編碼風格。

模塊1

def generateFile(self, current_date): 
    try: 
     print "*** %s ***" % current_date.strftime("%Y-%m-%d") 
     weekday=current_date.weekday() 
     if weekday >= 5: 
      print "skipping weekend" 
      return 
     self.taqdb = taqDB(self.index, self.offset) 
     cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5")) 
     outputFile = config.hdf5.filePath(self.index, date=current_date) 
     print "cache file: ", cache_filename 
     print "output file: ", outputFile 

     tempdir = "C:\\rfadump\\"+self.region+"\\" 
     input_filename = tempdir + filename 
     print "source file extracted at %s " % input_filename 

     ## universe 
     reader = rfaTextToTAQ.rfaTextToTAQ(self.tickobj) ## PARSER 
     count = 0 
     self.rawfile = open(input_filename, 'r') 
     lines = self.rawfile.read().split('|\n') 
     total_lines = len(lines) 
     self.rawfile.close() 
     del self.rawfile 
     print "parsing started" 
     start_time = dt.datetime.now() 
     print "current time: %s" % start_time 
     #while(len(lines) > 0): 
     while(count < total_lines): 
      #line = lines.pop(0) ## This slows down processing 
      result = reader.parseline(lines[count]+"|") 
      count += 1 
      if(count % 500000 == 0): 
       print "%d lines parsed" %(count) 
      if(result == None): 
       continue 
      ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = result 
      if(len(levelsUpdated) == 0 and tradeupdate == False): 
       continue 
      self.taqdb.insert(result) 

     ## write to hdf5 TODO 
     writer = h5Writer.h5Writer(cache_filename, self.tickobj) 
     writer.write(self.taqdb.groups) 
     writer.close() 

     del lines 
     del self.taqdb, self.tickobj 
     ########################################################## 
     print "parsing done." 
     end_time = dt.datetime.now() 
     print "end time is %s" % end_time 
     print "total time elapsed %s" % (end_time - start_time) 

     defragger = hdf.HDF5Defragmenter() 
     defragger.Defrag(cache_filename,outputFile) 
     del defragger 
     print "done" 
     gc.collect(2) 
    except: 
     print "cought an exception while generating file for day %s." % current_date.strftime("%Y-%m-%d") 
     tb = traceback.format_exc() 
     print tb 

模塊2 - taqdb - 存儲在數組中解析的數據

class taqDB: 
    def __init__(self, index, offset): 
    self.index = index 
    self.tickcfg = config.hdf5.getTickConfig(index) 
    self.offset = offset 
    self.groups = {} 

    def getGroup(self,ric): 
    if (self.groups.has_key(ric) == False): 
     self.groups[ric] = {} 
    return self.groups[ric] 

    def getOrderbookArray(self, ric, group): 
    datasetname = orderBookName 
    prodtype = self.tickcfg.getProdType(ric) 
    if(prodtype == ProdType.INDEX): 
     return 
    orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype) 
    if(group.has_key(datasetname) == False): 
     group[datasetname] = array.array("d") 
     orderbookArray = self.tickcfg.getOrderBookArray(prodtype) 
     return orderbookArray 
    else: 
     orderbookArray = group[datasetname] 
     if(len(orderbookArray) == 0): 
      return self.tickcfg.getOrderBookArray(prodtype) 
     lastOrderbook = orderbookArray[-orderbookArrayShape[1]:] 
     return np.array([lastOrderbook]) 

    def addToDataset(self, group, datasetname, timestamp, arr): 
    if(group.has_key(datasetname) == False): 
     group[datasetname] = array.array("d") 
    arr[0,0]=timestamp 
    a1 = group[datasetname] 
    a1.extend(arr[0]) 

    def addToOrderBook(self, group, timestamp, arr): 
    self.addToDataset(self, group, orderBookName, timestamp, arr) 

    def insert(self, data): 
    ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data 
    delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000)) 
    timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset 
    ## write to array 
    group = self.getGroup(ric) 

    orderbookUpdate = False 
    orderbookArray = self.getOrderbookArray(ric, group) 
    nonzero = quotes.nonzero() 
    orderbookArray[nonzero] = quotes[nonzero] 
    if(np.any(nonzero)): 
     self.addToDataset(group, orderBookName, timestamp, orderbookArray) 
    if(tradeupdate == True): 
     self.addToDataset(group, tradeName, timestamp, trades) 

模塊3-分析器

class rfaTextToTAQ: 
    """RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs.""" 
    def __init__(self,tickconfig): 
    self.tickconfig = tickconfig 
    self.token = '' 
    self.state = ReadState.SEQ_NUM 
    self.fvstate = fvstate.FID 
    self.quotes = np.array([]) # read from tickconfig 
    self.trades = np.array([]) # read from tickconfig 
    self.prodtype = ProdType.STOCK 
    self.allquotes = {} 
    self.alltrades = {} 
    self.acvol = 0 
    self.levelsUpdated = [] 
    self.quoteUpdate = False 
    self.tradeUpdate = False 
    self.depth = 0 

    def updateLevel(self, index): 
    if(self.levelsUpdated.__contains__(index) == False): 
     self.levelsUpdated.append(index) 

    def updateQuote(self, fidindex, field): 
    self.value = float(self.value) 
    if(self.depth == 1): 
     index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1)) 
     self.quotes[index[0]][fidindex[1][0]] = self.value 
     self.updateLevel(index[0]) 
    else: 
     self.quotes[fidindex] = self.value 
     self.updateLevel(fidindex[0][0]) 
    self.quoteUpdate = True 

    def updateTrade(self, fidindex, field): 
    #self.value = float(self.value) 
    if(self.tickconfig.tradeUpdate(self.depth) == False): 
     return 
    newacvol = float(self.value) 
    if(field == acvol): 
     if(self.value > self.acvol): 
      tradesize = newacvol - self.acvol 
      self.acvol = newacvol 
      self.trades[fidindex] = tradesize 
      if(self.trades.__contains__(0) == False): 
       self.tradeUpdate = True 
    else: 
     self.trades[fidindex] = self.value 
     if(not (self.trades[0,1]==0 or self.trades[0,2]==0)): 
      self.tradeUpdate = True 

    def updateResult(self): 
    field = '' 
    valid, field = field_dict.FIDToField(int(self.fid), field) 
    if(valid == False): 
     return 
    if(self.value == '0'): 
     return 
    if(self.prodtype == ProdType.STOCK): 
     fidindex = np.where(self.tickconfig.stkQuotes == field) 
     if(len(fidindex[0]) == 0): 
      fidindex = np.where(self.tickconfig.stkTrades == field) 
      if(len(fidindex[0]) == 0): 
       return 
      else: 
       self.updateTrade(fidindex, field) 
     else: 
      self.updateQuote(fidindex, field) 
    else: 
     fidindex = np.where(self.tickconfig.futQuotes == field) 
     if(len(fidindex[0]) == 0): 
      fidindex = np.where(self.tickconfig.futTrades == field) 
      if(len(fidindex[0]) == 0): 
       return 
      else: 
       self.updateTrade(fidindex, field) 
     else: 
      self.updateQuote(fidindex, field) 

    def getOrderBookTrade(self): 
    if (self.allquotes.has_key(self.ric) == False): 
     acvol = 0 
     self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype) 
     trades = self.tickconfig.getTradesArray() 
     self.alltrades[self.ric] = [trades, acvol] 
    return self.allquotes[self.ric], self.alltrades[self.ric] 

    def parseline(self, line): 
    self.tradeUpdate = False 
    self.levelsUpdated = [] 
    pos = 0 
    length = len(line) 
    self.state = ReadState.SEQ_NUM 
    self.fvstate = fvstate.FID 
    self.token = '' 
    ch = '' 
    while(pos < length): 
     prevChar = ch 
     ch = line[pos] 
     pos += 1 
     #SEQ_NUM 
     if(self.state == ReadState.SEQ_NUM): 
      if(ch != ','): 
       self.token += ch 
      else: 
       self.seq_num = int(self.token) 
       self.state = ReadState.TIMESTAMP 
       self.token = '' 
     # TIMESTAMP 
     elif(self.state == ReadState.TIMESTAMP): 
      if(ch == ' '): 
       self.token = '' 
      elif(ch != ','): 
       self.token += ch 
      else: 
       if(len(self.token) != 12): 
        print "Invalid timestamp format. %s. skipping line.\n", self.token 
        self.state = ReadState.SKIPLINE 
       else: 
        self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f') 
        self.state = ReadState.RIC 
       self.token = '' 
     # RIC 
     elif(self.state == ReadState.RIC): 
      if(ch != ','): 
       self.token += ch 
      else: 
       self.ric = self.token 
       self.token = '' 
       self.ric, self.depth = self.tickconfig.replaceRic(self.ric) 
       self.prodtype = self.tickconfig.getProdType(self.ric) 
       if(self.tickconfig.subscribed(self.ric)): 
        self.state = ReadState.UPDATE_TYPE 
        self.quotes, trades = self.getOrderBookTrade() 
        self.trades = trades[0] 
        self.acvol = trades[1] 
       else: 
        self.state = ReadState.SKIPLINE 
     # UPDATE_TYPE 
     elif(self.state == ReadState.UPDATE_TYPE): 
      if(ch != '|'): 
       self.token += ch 
      else: 
       self.update_type = self.token 
       self.token = '' 
       self.state = ReadState.FVPAIRS 
     #SKIPLINE 
     elif(self.state == ReadState.SKIPLINE): 
      return None 
     # FV PAIRS 
     elif(self.state == ReadState.FVPAIRS): 
      # FID 
      if(self.fvstate == fvstate.FID): 
       if(ch != ','): 
        if(ch.isdigit() == False): 
         self.token = self.value+ch 
         self.fvstate = fvstate.FIDVALUE 
         self.state = ReadState.FVPAIRS 
        else: 
         self.token += ch 
       else: 
        self.fid = self.token 
        self.token = '' 
        self.fvstate = fvstate.FIDVALUE 
        self.state = ReadState.FVPAIRS 
      # FIDVALUE 
      elif(self.fvstate == fvstate.FIDVALUE): 
       if(ch != '|'): 
        self.token += ch 
       else: 
        self.value = self.token 
        self.token = '' 
        self.state = ReadState.FVPAIRS 
        self.fvstate = fvstate.FID 
        # TODO set value 
        self.updateResult() 
    return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate 

謝謝。

+0

我們可以看到你的代碼嗎? –

+0

我已經添加了代碼的主要部分。我希望它有幫助。 – Alok

+0

歡迎任何有關代碼的建議。碎片整理對象是一個C++模塊,它被swigged然後在python中調用。 – Alok

回答

5

The only reliable way to free memory is to terminate the process.

所以,如果你的主程序spawns a worker process該工作進程完成時做的大部分工作(即在一天內完成的東西)的話,所使用的內存將被釋放:

import multiprocessing as mp 

def work(date): 
    # Do most of the memory-intensive work here 
    ... 

while single_date <= self.end_date: 
    proc = mp.Process(target = work, args = (single_date,)) 
    proc.start() 
    proc.join() 
+0

感謝@ubuntu。我將使用子進程方法分配大塊內存。鏈接清楚地解釋了可能是什麼問題。我在一次調用中做了大量的內存分配,由於內存碎片,可能無法使用。 – Alok