2017-05-22 92 views
0

我有相當大的csv文件,我需要操縱/逐行修改(因爲每行可能需要不同的修改規則),然後將它們寫出到另一個csv與正確格式。閱讀,格式,然後編寫大型CSV文件

目前,我有:

import multiprocessing 

def read(buffer): 
    pool = multiprocessing.Pool(4) 
    with open("/path/to/file.csv", 'r') as f: 
     while True: 
      lines = pool.map(format_data, f.readlines(buffer)) 
      if not lines: 
       break 
      yield lines 

def format_data(row): 
    row = row.split(',') # Because readlines() returns a string 
    # Do formatting via list comprehension 
    return row 

def main(): 
    buf = 65535 
    rows = read(buf) 
    with open("/path/to/new.csv",'w') as out: 
     writer = csv.writer(f, lineterminator='\n') 
     while rows: 
      try: 
       writer.writerows(next(rows)) 
      except StopIteration: 
       break 

即使我通過map使用多處理和防止內存過載有發電機,它仍然需要我超過2分鐘來處理40000行。說實話,不應該花那麼多。我甚至從生成器輸出生成了一個嵌套列表,並試圖一次性將數據寫入一個大文件,這是一個逐塊塊化的方法,並且仍然需要很長時間。我在這裏做錯了什麼?

回答

0

我已經想通了。

首先,問題出在我的format_data()函數中。它正在調用數據庫連接,每次運行它時,它都會構建數據庫連接並在每次迭代時關閉它。

我通過創建一個基本的映射通過字典爲指數更快的查找表,它支持多線程修復它。

所以,我的代碼如下所示:

import multiprocessing 

def read(buffer): 
    pool = multiprocessing.Pool(4) 
    with open("/path/to/file.csv", 'r') as f: 
     while True: 
      lines = pool.map(format_data, f.readlines(buffer)) 
      if not lines: 
       break 
      yield lines 

def format_data(row): 
    row = row.split(',') # Because readlines() returns a string 
    # Do formatting via list comprehension AND a dictionary lookup 
    # vice a database connection 
    return row 

def main(): 
    rows = read(1024*1024) 
    with open("/path/to/new.csv",'w') as out: 
     while rows: 
      try: 
       csv.writer(f, lineterminator='\n').writerows(next(rows)) 
      except StopIteration: 
       break 

我能夠解析〜150MB的文件,在不到30秒。在這裏學到的一些經驗教訓可以讓其他人希望學習。