3
我有一個非常大的CSV文件。超過7500萬行。將Large .csv處理成Redis
我必須每隔一小時將這個.csv文件「發佈」到一個Redis羣集中。 (每小時)
腳本:
import csv
import redis
import random
from redis import StrictRedis
import multiprocessing as mp
import itertools
import time
def worker(chunk):
return len(chunk)
def keyfunc(row):
return row[0]
def main():
client = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0)
client1 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0)
client2 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0)
list1 =(client, client1, client2)
pool = mp.Pool()
largefile = 'Example.csv'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
key1 = 'AAM_CDF_Traits'
doc = chunk
random.choice(list1).publish(key1, pool)
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
問題:
這是一個需要解決這個問題的正確方法?我還可以通過其他方式解決這個問題。
爲什麼我有這個錯誤?
回溯(最近通話最後一個):
文件 「./AAM_Redis4.sh」,第47行,在
main()
文件 「./AAM_Redis4.sh」,33行,在主
itertools.islice(chunks, num_chunks)]
類型錯誤:「元組」對象不callabl
我可以誠實地說,這種方法把我的處理時間從1-2小時縮短到5-7分鐘。對記憶影響最小。 – user2748540