2016-11-18 61 views
3

我是一名博士研究生,學習市場微觀結構。我需要處理非常大的數據集(幾百GB的毫秒數據)。我一直在使用SAS,這對於處理數據幀格式的大數據非常好。但是,這是昂貴的。我想用Python來進行我的學習/研究。 我在Python中有一些但並不是先進的技能。我聽說熊貓在處理數據幀方面非常有效,但它僅限於RAM,這對我的目的來說並不是很好。將大型csv文件讀入字典時出現內存錯誤

我曾嘗試過: 我試圖逐行迭代數據,處理它們並存儲到字典中,但是這具有內存限制。 我得到了內存錯誤,我可以看到Python咀嚼所有的RAM(我有32GB)。與稍後我要處理的數據(50〜100 gb)相比,該數據集仍然非常小(500 MB)。此外,還有一些難以一行一行地完成的事情,例如迴歸,圖表等。 所以我的問題是我應該如何處理和存儲這些數據?

輸入的數據是這樣的:

#RIC Date[L]  Time[L] Type Price Volume Bid Price Ask Price 
TPI.AX 20140820 00:11.7 Quote        0.91 
TPI.AX 20140820 00:11.7 Trade 0.91 10000  
TPI.AX 20140820 00:21.5 Quote        0.91 
TPI.AX 20140820 00:22.1 Quote     0.905 
TPI.AX 20140820 00:42.2 Quote     0.905 
TPI.AX 20140820 00:42.6 Trade 0.9075 117  
TPI.AX 20140820 00:43.1 Trade 0.9075 495  
TPI.AX 20140820 00:49.6 Quote     0.905 
TPI.AX 20140820 00:57.6 Quote     0.905 
TPI.AX 20140820 00:57.6 Quote     0.905 
TPI.AX 20140820 00:58.3 Quote     0.905 
TPI.AX 20140820 01:02.6 Quote        0.91 
TPI.AX 20140820 01:02.6 Quote        0.91 
TPI.AX 20140820 01:02.6 Quote     0.905 
TPI.AX 20140820 01:02.6 Trade 0.91 9365   
TPI.AX 20140820 01:02.6 Trade 0.91 9041   

這是我的代碼:

def spread_calculation(input_file_list, output_file): 
    """This function calculates the spreads for securities in input_file_list 
    input: trade and quote data from TRTH 
    2 parameters: 1. list of file names, 2.output file name 
    output: csv file contains spreads""" 
    # Set variables: 
    date = None 
    exchange_bbo = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))) 
    effective_spread = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))) 
    time_bucket = [i * 100000.0 for i in range(0, (16 * 60 * 60 * 1000) * 1000/100000)] 
    for file in input_file_list: 
     file_to_open = '%s.csv' % file 
     reader = csv.DictReader(open(file_to_open, 'rb')) 
     for i in reader: 
      if not bool(date): 
       date = i['Date[L]'][0:4] + "-" + i['Date[L]'][4:6] + "-" + i['Date[L]'][6:8] 
      if i['Type'] == 'Quote' and (time_to_milli(i['Time[L]']) <= (16*60*60*1000)*1000): 
       security = i['#RIC'].split('.')[0] 
       exchange = i['#RIC'].split('.')[1] 
       timestamp = float(time_to_milli(i['Time[L]'])) 
       bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0 
       if i['Bid Price'] == "": 
        bid = 0.0 
       else: 
        bid = float(i['Bid Price']) 
       if i['Ask Price'] == "": 
        ask = 0.0 
       else: 
        ask = float(i['Ask Price']) 
       if bid < ask < 199999.99: 
        if not bool(exchange_bbo[security][exchange][date][bucket]['ask']): 
         exchange_bbo[security][exchange][date][bucket]['ask'] = ask 
         exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp 
        elif exchange_bbo[security][exchange][date][bucket]['diff_ask'] > bucket - timestamp: 
         exchange_bbo[security][exchange][date][bucket]['ask'] = ask 
         exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp 
        if not bool(exchange_bbo[security][exchange][date][bucket]['bid']): 
         exchange_bbo[security][exchange][date][bucket]['bid'] = bid 
         exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp 
        elif exchange_bbo[security][exchange][date][bucket]['diff_bid'] > bucket - timestamp: 
         exchange_bbo[security][exchange][date][bucket]['bid'] = bid 
         exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp 
      if i['Type'] == 'Trade' and i['Price'] != "" and i['Price'] != 0.0: 
       timestamp = float(time_to_milli(i['Time[L]'])) 
       bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0 
       security = i['#RIC'].split('.')[0] 
       exchange = i['#RIC'].split('.')[1] 
       price = float(i['Price']) 
       volume= float(i['Volume']) 
       if not bool(exchange_bbo[security][exchange][date][bucket]['price']): 
        exchange_bbo[security][exchange][date][bucket]['price'] = price 
        exchange_bbo[security][exchange][date][bucket]['volume'] = volume 
        exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp 
       elif exchange_bbo[security][exchange][date][bucket]['time_diff'] > bucket - timestamp and price != 0.0: 
        exchange_bbo[security][exchange][date][bucket]['price'] = price 
        exchange_bbo[security][exchange][date][bucket]['volume'] = volume 
        exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp 

     # Fill the empty buckets - exchange level 
     for security in exchange_bbo: 
      for exchange in exchange_bbo[security]: 
       for date in exchange_bbo[security][exchange]: 
        for bucket in time_bucket: 
         previous = bucket - 100000.0 
         # best offer 
         bo_t = exchange_bbo[security][exchange][date][bucket]['ask'] 
         bo_t1 = exchange_bbo[security][exchange][date][previous]['ask'] 
         if bo_t == 0.0 and bo_t1 != 0.0: 
          exchange_bbo[security][exchange][date][bucket]['ask'] = bo_t1 
         # best bid 
         bb_t = exchange_bbo[security][exchange][date][bucket]['bid'] 
         bb_t1 = exchange_bbo[security][exchange][date][previous]['bid'] 
         if bb_t == 0.0 and bb_t1 != 0.0: 
          exchange_bbo[security][exchange][date][bucket]['bid'] = bb_t1 

     for security in exchange_bbo: 
      for exchange in exchange_bbo[security]: 
       for date in exchange_bbo[security][exchange]: 
        for bucket in exchange_bbo[security][exchange][date]: 
         if not bool(exchange_bbo[security][exchange][date][bucket]['price']): 
          nbo = exchange_bbo[security][exchange][date][bucket]['ask'] 
          nbb = exchange_bbo[security][exchange][date][bucket]['bid'] 
          midpoint = (nbo + nbb)/2.0 
          price = exchange_bbo[security][exchange][date][bucket]['price'] 
          volume= exchange_bbo[security][exchange][date][bucket]['volume'] 
          # print security, exchange, bucket, price, midpoint 
          if price > 0.0 and midpoint != 0.0: 
           effective_spread[security][exchange][date][bucket]['espread_bps'] = 2.0 * abs(price - midpoint)/midpoint 
           effective_spread[security][exchange][date][bucket]['volume']=volume 
           effective_spread[security][exchange][date]['count'] += 1.0 

     data_writer = csv.DictWriter(open(output_file, 'wb'), 
            fieldnames=['security', 'exchange', 'date', 'bucket' 'espread_bps', 'volume', 'count']) 

     data_writer.writeheader() 

     for security in effective_spread: 
      for exchange in effective_spread[security]: 
       for date in effective_spread[security][exchange]: 
        for bucket in effective_spread[security][exchange][date]: 
         espread_bps = effective_spread[security][exchange][date][bucket]['espread_bps'] 
         volume = effective_spread[security][exchange][date][bucket]['volume'] 
         count = effective_spread[security][exchange][date][bucket]['count'] 
         data_writer.writerow({'security': security, 'exchange': exchange, 'date': date, 'bucket': bucket, 
               'espread_bps': espread_bps, 'volume': volume, 'count': count}) 

input_files = ['ScandinavianTAQ'] 

謝謝你這麼多

回答

0

100 GB不算多數據。一個SQL數據庫和Pandas應該是你所需要的。您需要了解如何編寫SQL查詢,並且我建議您獲取Wes McKinney的book副本。我沒有看過你的代碼,但在我看來,最大的問題是你正在逐行處理所有事情,而不是分組你的操作。
此外,檢查出Dask

0

如果你打算處理大量的外部存儲字典,我會檢查出elastic。適用於大數據並具有平均學習曲線。

對於大於內存的文件,您可以查看memmaplazy reading,以便逐行接受。通常迭代是可接受的方法。

分組操作也有助於您的上下文,例如考慮是否存在可以並行執行的獨立操作。爲此請查看一些示例SO帖子,如this。這將有利於您與您的領域的領域專家談論優化計算。

你也有權訪問外部服務器嗎?如果你這樣做,而且它是一個分佈式系統,你的選擇就更多了。