2013-07-04 67 views
0

[使用Python3.3]我有一個巨大的CSV文件,其中包含XX百萬行,幷包括幾列。我想讀取該文件,添加幾個計算列並吐出幾個'分段'csv文件。我在下面的代碼中嘗試了一個更小的測試文件,它完全按照我的意願去做。但現在我加載了原始的CSV文件(大約3.2 GB),並且出現內存錯誤。有沒有更有效率的內存寫下面的代碼?Python MemoryError - 有沒有更有效的方式處理我的巨大CSV文件?

請注意,我對Python很陌生,因此可能有很多東西我都沒有意識到。

例輸入數據:

email    cc nr_of_transactions last_transaction_date timebucket total_basket 
[email protected] us 2     datetime value   1   20.29 
[email protected] gb 3     datetime value   2   50.84 
[email protected] ca 5     datetime value   3   119.12 
...     ... ...     ...      ...   ... 

這是我的代碼:

import csv 
import scipy.stats as stats 
import itertools 
from operator import itemgetter 


def add_rankperc(filename): 
    ''' 
    Function that calculates percentile rank of total basket value of a user (i.e. email) within a country. Next, it assigns the user to a rankbucket based on its percentile rank, using the following rules: 
    Percentage rank between 75 and 100 -> top25 
    Percentage rank between 25 and 74 -> mid50 
    Percentage rank between 0 and 24 -> bottom25 
    ''' 

    # Defining headers for ease of use/DictReader 
    headers = ['email', 'cc', 'nr_transactions', 'last_transaction_date', 'timebucket', 'total_basket'] 
    groups = [] 

    with open(filename, encoding='utf-8', mode='r') as f_in: 
     # Input file is tab-separated, hence dialect='excel-tab' 
     r = csv.DictReader(f_in, dialect='excel-tab', fieldnames=headers) 
     # DictReader reads all dict values as strings, converting total_basket to a float 
     dict_list = [] 
     for row in r: 
      row['total_basket'] = float(row['total_basket']) 
      # Append row to a list (of dictionaries) for further processing 
      dict_list.append(row) 

    # Groupby function on cc and total_basket 
    for key, group in itertools.groupby(sorted(dict_list, key=itemgetter('cc', 'total_basket')), key=itemgetter('cc')): 
     rows = list(group) 
     for row in rows: 
      # Calculates the percentile rank for each value for each country 
      row['rankperc'] = stats.percentileofscore([row['total_basket'] for row in rows], row['total_basket']) 
      # Percentage rank between 75 and 100 -> top25 
      if 75 <= row['rankperc'] <= 100: 
       row['rankbucket'] = 'top25' 
      # Percentage rank between 25 and 74 -> mid50 
      elif 25 <= row['rankperc'] < 75: 
       row['rankbucket'] = 'mid50' 
      # Percentage rank between 0 and 24 -> bottom25 
      else: 
       row['rankbucket'] = 'bottom25' 
      # Appending all rows to a list to be able to return it and use it in another function 
      groups.append(row) 
    return groups 


def filter_n_write(data): 
    ''' 
    Function takes input data, groups by specified keys and outputs only the e-mail addresses to csv files as per the respective grouping. 
    ''' 

    # Creating group iterator based on keys 
    for key, group in itertools.groupby(sorted(data, key=itemgetter('timebucket', 'rankbucket')), key=itemgetter('timebucket', 'rankbucket')): 
     # List comprehension to create a list of lists of email addresses. One row corresponds to the respective combination of grouping keys. 
     emails = list([row['email'] for row in group]) 
     # Dynamically naming output file based on grouping keys 
     f_out = 'output-{}-{}.csv'.format(key[0], key[1]) 
     with open(f_out, encoding='utf-8', mode='w') as fout: 
      w = csv.writer(fout, dialect='excel', lineterminator='\n') 
      # Writerows using list comprehension to write each email in emails iterator (i.e. one address per row). Wrapping email in brackets to write full address in one cell. 
      w.writerows([email] for email in emails) 

filter_n_write(add_rankperc('infile.tsv')) 

提前感謝!

+0

「我有一個巨大的CSV文件,其中包含大約4600萬行,幷包括幾列」....爲什麼?這是關於存儲數據的效率最低的方法......您應該切換數據存儲方法,而不是嘗試爲您的CSV工作......爲什麼不嘗試一些SQL? (或其他任何實際使用數據庫或存儲方法*的意思是*用於存儲大量數據 - 與csv文件不同) –

+0

因爲這是一個csv是導出數據庫系統。爲什麼我正在編寫一個python腳本是因爲「分組」並將輸出寫入多個csv文件。你說得對,我可以在數據庫系統中做到這一點,但是它會要求我下載每一個電子郵件地址列表,最多可以有180個csv文件。所以相反,我想寫一個腳本來爲我做這件事。這是否更有意義? – Matthijs

+0

爲什麼不直接用Python與數據庫交互?然後只是提取你所需要的,並以最有效的方式創建你想要的輸出/結果文件。 –

回答

3

熊貓圖書館(http://pandas.pydata.org/)具有非常好的和快速的CSV閱讀功能(http://pandas.pydata.org/pandas-docs/stable/io.html#io-read-csv-table)。作爲額外的獎勵,您將數據作爲numpy數組,使計算百分比變得非常容易。 這個question討論與大熊貓一起讀大塊的CSV。

+0

嗨,羅伯特,我聽說過熊貓,但沒有經驗。將研究它,希望它會有道理。同時,對目前問題的任何其他幫助將不勝感激。 – Matthijs

2

我同意Inbar Rose認爲使用數據庫功能會比較好 攻擊這個問題。假設我們需要回答這個問題, 雖然 - 我認爲我們可以以速度爲代價。

您可能在構建所有行' 字典的列表中耗盡內存。我們可以一次只考慮行 的子集來解決此問題。

這裏是我的第一步碼 - 大概你add_rankperc功能:

import csv 
from scipy.stats import percentileofscore 
from operator import itemgetter 

# Run through the whole file once, saving each row to a file corresponding to 
# its 'cc' column 
cc_dict = {} 
with open(input_path, encoding="utf-8", mode='r') as infile: 
    csv_reader = csv.reader(infile, dialect="excel-tab") 
    for row in csv_reader: 
    cc = row[1] 
    if cc not in cc_dict: 
     intermediate_path = "intermediate_cc_{}.txt".format(cc) 
     outfile = open(intermediate_path, mode='w', newline='') 
     csv_writer = csv.writer(outfile) 
     cc_dict[cc] = (intermediate_path, outfile, csv_writer) 
    _ = cc_dict[cc][2].writerow(row) 

# Close the output files 
for cc in cc_dict.keys(): 
    cc_dict[cc][1].close() 

# Run through the whole file once for each 'cc' value 
for cc in cc_dict.keys(): 
    intermediate_path = cc_dict[cc][0] 
    with open(intermediate_path, mode='r', newline='') as infile: 
    csv_reader = csv.reader(infile) 
    # Pick out all of the rows with the 'cc' value under consideration 
    group = [row for row in csv_reader if row[1] == cc] 
    # Get the 'total_basket' values for the group 
    A_scores = [float(row[5]) for row in group] 
    for row in group: 
     # Compute this row's 'total_basket' score based on the rest of the 
     # group's 
     p = percentileofscore(A_scores, float(row[5])) 
     row.append(p) 
     # Categorize the score 
     bucket = ("bottom25" if p < 25 else ("mid50" if p < 75 else "top100")) 
     row.append(bucket) 
    # Save the augmented rows to an intermediate file 
    with open(output_path, mode='a', newline='') as outfile: 
    csv_writer = csv.writer(outfile) 
    csv_writer.writerows(group) 

4600萬行是很多,所以這很可能將是緩慢的。我避免使用csv模塊的 DictReader功能,並直接索引行 以避免這種開銷。我還爲每個組計算了第一個參數 percentileofscores而不是組中的每一行。

如果這工作那麼我認爲你可以按照爲filter_n_write 功能相同的想法 - 通過生成中間文件運行一次,挑出 (timebucket, rank)對。然後再通過中間文件,每一對爲 。

+0

嗨Bo,你的代碼工作正在解決MemoryError問題。但是現在我挖掘出了另一個,即腳本需要很長時間才能完成 - 現在已經運行了好幾個小時。展望其他解決方案。 – Matthijs

+0

我編輯了我的解決方案,以避免每個'cc'值都旋轉通過所有46M行,而是使用一堆中間文件。這有幫助嗎? – bbayles

相關問題