我是一名博士研究生,學習市場微觀結構。我需要處理非常大的數據集(幾百GB的毫秒數據)。我一直在使用SAS,這對於處理數據幀格式的大數據非常好。但是,這是昂貴的。我想用Python來進行我的學習/研究。 我在Python中有一些但並不是先進的技能。我聽說熊貓在處理數據幀方面非常有效,但它僅限於RAM,這對我的目的來說並不是很好。將大型csv文件讀入字典時出現內存錯誤
我曾嘗試過: 我試圖逐行迭代數據,處理它們並存儲到字典中,但是這具有內存限制。 我得到了內存錯誤,我可以看到Python咀嚼所有的RAM(我有32GB)。與稍後我要處理的數據(50〜100 gb)相比,該數據集仍然非常小(500 MB)。此外,還有一些難以一行一行地完成的事情,例如迴歸,圖表等。 所以我的問題是我應該如何處理和存儲這些數據?
輸入的數據是這樣的:
#RIC Date[L] Time[L] Type Price Volume Bid Price Ask Price
TPI.AX 20140820 00:11.7 Quote 0.91
TPI.AX 20140820 00:11.7 Trade 0.91 10000
TPI.AX 20140820 00:21.5 Quote 0.91
TPI.AX 20140820 00:22.1 Quote 0.905
TPI.AX 20140820 00:42.2 Quote 0.905
TPI.AX 20140820 00:42.6 Trade 0.9075 117
TPI.AX 20140820 00:43.1 Trade 0.9075 495
TPI.AX 20140820 00:49.6 Quote 0.905
TPI.AX 20140820 00:57.6 Quote 0.905
TPI.AX 20140820 00:57.6 Quote 0.905
TPI.AX 20140820 00:58.3 Quote 0.905
TPI.AX 20140820 01:02.6 Quote 0.91
TPI.AX 20140820 01:02.6 Quote 0.91
TPI.AX 20140820 01:02.6 Quote 0.905
TPI.AX 20140820 01:02.6 Trade 0.91 9365
TPI.AX 20140820 01:02.6 Trade 0.91 9041
這是我的代碼:
def spread_calculation(input_file_list, output_file):
"""This function calculates the spreads for securities in input_file_list
input: trade and quote data from TRTH
2 parameters: 1. list of file names, 2.output file name
output: csv file contains spreads"""
# Set variables:
date = None
exchange_bbo = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))))
effective_spread = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))))
time_bucket = [i * 100000.0 for i in range(0, (16 * 60 * 60 * 1000) * 1000/100000)]
for file in input_file_list:
file_to_open = '%s.csv' % file
reader = csv.DictReader(open(file_to_open, 'rb'))
for i in reader:
if not bool(date):
date = i['Date[L]'][0:4] + "-" + i['Date[L]'][4:6] + "-" + i['Date[L]'][6:8]
if i['Type'] == 'Quote' and (time_to_milli(i['Time[L]']) <= (16*60*60*1000)*1000):
security = i['#RIC'].split('.')[0]
exchange = i['#RIC'].split('.')[1]
timestamp = float(time_to_milli(i['Time[L]']))
bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0
if i['Bid Price'] == "":
bid = 0.0
else:
bid = float(i['Bid Price'])
if i['Ask Price'] == "":
ask = 0.0
else:
ask = float(i['Ask Price'])
if bid < ask < 199999.99:
if not bool(exchange_bbo[security][exchange][date][bucket]['ask']):
exchange_bbo[security][exchange][date][bucket]['ask'] = ask
exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp
elif exchange_bbo[security][exchange][date][bucket]['diff_ask'] > bucket - timestamp:
exchange_bbo[security][exchange][date][bucket]['ask'] = ask
exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp
if not bool(exchange_bbo[security][exchange][date][bucket]['bid']):
exchange_bbo[security][exchange][date][bucket]['bid'] = bid
exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp
elif exchange_bbo[security][exchange][date][bucket]['diff_bid'] > bucket - timestamp:
exchange_bbo[security][exchange][date][bucket]['bid'] = bid
exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp
if i['Type'] == 'Trade' and i['Price'] != "" and i['Price'] != 0.0:
timestamp = float(time_to_milli(i['Time[L]']))
bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0
security = i['#RIC'].split('.')[0]
exchange = i['#RIC'].split('.')[1]
price = float(i['Price'])
volume= float(i['Volume'])
if not bool(exchange_bbo[security][exchange][date][bucket]['price']):
exchange_bbo[security][exchange][date][bucket]['price'] = price
exchange_bbo[security][exchange][date][bucket]['volume'] = volume
exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp
elif exchange_bbo[security][exchange][date][bucket]['time_diff'] > bucket - timestamp and price != 0.0:
exchange_bbo[security][exchange][date][bucket]['price'] = price
exchange_bbo[security][exchange][date][bucket]['volume'] = volume
exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp
# Fill the empty buckets - exchange level
for security in exchange_bbo:
for exchange in exchange_bbo[security]:
for date in exchange_bbo[security][exchange]:
for bucket in time_bucket:
previous = bucket - 100000.0
# best offer
bo_t = exchange_bbo[security][exchange][date][bucket]['ask']
bo_t1 = exchange_bbo[security][exchange][date][previous]['ask']
if bo_t == 0.0 and bo_t1 != 0.0:
exchange_bbo[security][exchange][date][bucket]['ask'] = bo_t1
# best bid
bb_t = exchange_bbo[security][exchange][date][bucket]['bid']
bb_t1 = exchange_bbo[security][exchange][date][previous]['bid']
if bb_t == 0.0 and bb_t1 != 0.0:
exchange_bbo[security][exchange][date][bucket]['bid'] = bb_t1
for security in exchange_bbo:
for exchange in exchange_bbo[security]:
for date in exchange_bbo[security][exchange]:
for bucket in exchange_bbo[security][exchange][date]:
if not bool(exchange_bbo[security][exchange][date][bucket]['price']):
nbo = exchange_bbo[security][exchange][date][bucket]['ask']
nbb = exchange_bbo[security][exchange][date][bucket]['bid']
midpoint = (nbo + nbb)/2.0
price = exchange_bbo[security][exchange][date][bucket]['price']
volume= exchange_bbo[security][exchange][date][bucket]['volume']
# print security, exchange, bucket, price, midpoint
if price > 0.0 and midpoint != 0.0:
effective_spread[security][exchange][date][bucket]['espread_bps'] = 2.0 * abs(price - midpoint)/midpoint
effective_spread[security][exchange][date][bucket]['volume']=volume
effective_spread[security][exchange][date]['count'] += 1.0
data_writer = csv.DictWriter(open(output_file, 'wb'),
fieldnames=['security', 'exchange', 'date', 'bucket' 'espread_bps', 'volume', 'count'])
data_writer.writeheader()
for security in effective_spread:
for exchange in effective_spread[security]:
for date in effective_spread[security][exchange]:
for bucket in effective_spread[security][exchange][date]:
espread_bps = effective_spread[security][exchange][date][bucket]['espread_bps']
volume = effective_spread[security][exchange][date][bucket]['volume']
count = effective_spread[security][exchange][date][bucket]['count']
data_writer.writerow({'security': security, 'exchange': exchange, 'date': date, 'bucket': bucket,
'espread_bps': espread_bps, 'volume': volume, 'count': count})
input_files = ['ScandinavianTAQ']
謝謝你這麼多