2014-04-29 65 views
3

我有一個非常大的CSV文件。超過7500萬行。將Large .csv處理成Redis

我必須每隔一小時將這個.csv文件「發佈」到一個Redis羣集中。 (每小時)

腳本:

import csv 
import redis 
import random 
from redis import StrictRedis 
import multiprocessing as mp 
import itertools 
import time 

def worker(chunk): 
    return len(chunk) 

def keyfunc(row): 
    return row[0] 

def main(): 
    client = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0) 
    client1 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0) 
    client2 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0) 
    list1 =(client, client1, client2) 
    pool = mp.Pool() 
    largefile = 'Example.csv' 
    num_chunks = 10 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 

    key1 = 'AAM_CDF_Traits' 
    doc = chunk 
    random.choice(list1).publish(key1, pool) 
    pool.close() 
    pool.join() 
    print(results) 

if __name__ == '__main__': 

    main() 

問題:

這是一個需要解決這個問題的正確方法?我還可以通過其他方式解決這個問題。

爲什麼我有這個錯誤?

回溯(最近通話最後一個):

文件 「./AAM_Redis4.sh」,第47行,在

main() 

文件 「./AAM_Redis4.sh」,33行,在主

itertools.islice(chunks, num_chunks)] 

類型錯誤:「元組」對象不callabl

+0

我可以誠實地說,這種方法把我的處理時間從1-2小時縮短到5-7分鐘。對記憶影響最小。 – user2748540

回答

0

,因爲你使用的內置我你可能會得到一個錯誤n功能列表作爲變量名:

list =(client, client1, client2) 

後來,當你調用函數列表,它被稱爲是一個變量,這可能會導致一個問題:

groups = [list(chunk) for key, chunk in 
+0

我完全錯過了。謝謝。 – user2748540