2016-01-14 66 views
2

我正在處理一個項目的兩個大型數據集文件。我管理着逐行清理文件。然而,在嘗試應用相同的邏輯來合併基於普通列的2個文件時,它失敗了。問題是第二個循環完全運行,然後運行頂層循環(不知道爲什麼會發生這種情況)。我嘗試使用numpy的需要在python中逐行合併2個大型csv文件

buys = np.genfromtxt('buys_dtsep.dat',delimiter=",",dtype='str') 
clicks = np.genfromtxt('clicks_dtsep.dat',delimiter=",",dtype='str') 
f = open('combined.dat', 'w') 
for s in clicks: 
    for s2 in buys: 
     #process data 

但加載與33000000個條目的文件到一個數組是不可行的,由於存儲器限制的時間,將採取的數據加載到一個數組,然後對其進行處理。我正在嘗試逐行處理文件以避免內存不足。打印的

buys = open('buys_dtsep.dat') 
clicks = open('clicks_dtsep.dat') 
f = open('combined.dat', 'w') 

csv_buys = csv.reader(buys) 
csv_clicks = csv.reader(clicks) 



for s in csv_clicks: 
    print 'file 1 row x'#to check when it loops 
    for s2 in csv_buys: 
     print s2[0] #check looped data 
      #do merge op 

輸出應該

file 1 row 0 
file 2 row 0 
... 
file 2 row x 
file 1 row 1 
and so on 

輸出我得到的是

file 2 row 0 
file 2 row 1 
... 
file 2 row x 
file 1 row 0 
... 
file 1 row z 

如果上述循環的問題能夠得到解決,生病能夠通過行合併文件行。

更新時間:樣本數據

購買文件樣本

420374,2014-04-06,18:44:58.314,214537888,12462,1 
420374,2014-04-06,18:44:58.325,214537850,10471,1 
281626,2014-04-06,09:40:13.032,214535653,1883,1 
420368,2014-04-04,06:13:28.848,214530572,6073,1 
420368,2014-04-04,06:13:28.858,214835025,2617,1 
140806,2014-04-07,09:22:28.132,214668193,523,1 
140806,2014-04-07,09:22:28.176,214587399,1046,1 

點擊文件樣本

420374,2014-04-06,18:44:58,214537888,0 
420374,2014-04-06,18:41:50,214537888,0 
420374,2014-04-06,18:42:33,214537850,0 
420374,2014-04-06,18:42:38,214537850,0 
420374,2014-04-06,18:43:02,214537888,0 
420374,2014-04-06,18:43:10,214537888,0 
420369,2014-04-07,19:39:43,214839373,0 
420369,2014-04-07,19:39:56,214684513,0 
+0

你不能用'pandas'是什麼?如果是的話,你可以考慮像''那樣''read_csv''chunks'參數(http://pandas.pydata.org/pandas-docs/stable/io.html#iterating-through-files-chunk-by-chunk )示例 –

+0

您可以從兩個dat文件中添加一些樣本行嗎? –

回答

1

以下辦法希望能幫助的。它的目的是更快,並降低內存要求:

from heapq import merge 
from itertools import groupby, ifilter 

def get_click_entries(key): 
    with open('clicks.csv', 'rb') as f_clicks: 
     for entry in ifilter(lambda x: int(x[0]) == key, csv.reader(f_clicks)): 
      entry.insert(4, '') # add empty missing column 
      yield entry 

# First create a set holding all column 0 click entries 

with open('clicks.csv', 'rb') as f_clicks: 
    csv_clicks = csv.reader(f_clicks) 
    click_keys = {int(cols[0]) for cols in csv_clicks} 

with open('buys.csv', 'rb') as f_buys, \ 
    open('clicks.csv', 'rb') as f_clicks, \ 
    open('merged.csv', 'wb') as f_merged: 

    csv_buys = csv.reader(f_buys) 
    csv_clicks = csv.reader(f_clicks) 
    csv_merged = csv.writer(f_merged) 

    for k, g in groupby(csv_buys, key=lambda x: int(x[0])): 
     if k in click_keys: 
      buys = sorted(g, key=lambda x: (x[1], x[2])) 
      clicks = sorted(get_click_entries(k), key=lambda x: (x[1], x[2])) 
      csv_merged.writerows(merge(buys, clicks))  # merge the two lists based on the timestamp 
      click_keys.remove(k) 
     csv_merged.writerows(g) 

    # Write any remaining click entries 

    for k in click_keys: 
     csv_merged.writerows(get_click_entries(k)) 

對於你的兩個示例文件,這將產生以下的輸出:

140806,2014-04-07,09:22:28.132,214668193,523,1 
140806,2014-04-07,09:22:28.176,214587399,1046,1 
281626,2014-04-06,09:40:13.032,214535653,1883,1 
420368,2014-04-04,06:13:28.848,214530572,6073,1 
420368,2014-04-04,06:13:28.858,214835025,2617,1 
420374,2014-04-06,18:41:50,214537888,,0 
420374,2014-04-06,18:42:33,214537850,,0 
420374,2014-04-06,18:42:38,214537850,,0 
420374,2014-04-06,18:43:02,214537888,,0 
420374,2014-04-06,18:43:10,214537888,,0 
420374,2014-04-06,18:44:58,214537888,,0 
420374,2014-04-06,18:44:58.314,214537888,12462,1 
420374,2014-04-06,18:44:58.325,214537850,10471,1 
420369,2014-04-07,19:39:43,214839373,,0 
420369,2014-04-07,19:39:56,214684513,,0 

它的工作原理是首先創建一組所有的列0條目,這意味着如果知道該條目不存在,則可以避免重新讀取整個點擊文件。然後嘗試從buys讀取一組匹配的列0條目,並從clicks中讀入對應的列0條目列表。然後這些將根據時間戳進行排序並按順序合併。這個條目然後從集合中刪除,以便它們不被重讀。

+0

我喜歡你的方法,但是由於我在一個數據集中有大約3300個條目並且在另一個數據集中有幾百萬個條目,所以不會創建一個集合需要很長的時間。我也會嘗試這個解決方案,併發布結果。謝謝 – duckvader

+0

您需要重新讀取每個循環的點擊次數,因此創建一次集合不會花費太多時間,並且會避免許多不必要的重讀。我認爲這兩個文件都不適合內存。 –

+0

謝謝。我部分地嘗試了你的方法。它工作得很好:) – duckvader

1

編輯:OP希望各地二號文件,所以我改變了我的答案

您正在循環第一個文件中的第一行,然後遍歷第二個循環。 由於csv_buys迭代器將在第一次循環的第一次運行中消耗,因此您的內循環只能運行一次。

for s in csv_clicks: # <--- looping over the 1st file works fine 
    print 'file 1 row x'#to check when it loops 
    for s2 in csv_buys: #<--- loops all over the 2nd one and finish the iterator! this loop will ONLY work once! 
     print s2[0] #check looped data 
     #do merge op 

你需要做的是:

for s in csv_clicks: # <--- stays the same - works fine 
    print 'file 1 row x'#to check when it loops 
    for s2 in open('buys_dtsep.dat'): #<---- Now you loop from the start each time :) yay 
     print s2[0] #check looped data 
     #do merge op 

警告:上面的代碼爲O^2的複雜性。

如果你的腳本會很慢(它將),你必須考慮不同的解決方案

+0

我需要檢查是否在文件2的任何行中找到與文件1中第i行第1列的匹配,以便創建合併文件。這就是爲什麼我嵌套循環。我試着用上面的方法做同樣的工作 – duckvader

+0

@duckvader - 改變了我的答案,每次循環遍歷第二個文件 –

+0

新的解決方案允許我訪問我需要的東西。謝謝。雖然它很慢,但我會繼續努力,直到找到更好的解決方案。 – duckvader

0

我已經將文件替換爲StringIO例如。與文件對象代碼看起來相同。

import StringIO 

file1 = StringIO.StringIO("""420374,2014-04-06,18:44:58.314,214537888,12462,1 
420374,2014-04-06,18:44:58.325,214537850,10471,1 
281626,2014-04-06,09:40:13.032,214535653,1883,1 
420368,2014-04-04,06:13:28.848,214530572,6073,1 
420368,2014-04-04,06:13:28.858,214835025,2617,1 
140806,2014-04-07,09:22:28.132,214668193,523,1 
140806,2014-04-07,09:22:28.176,214587399,1046,1""") 

file2 = StringIO.StringIO("""420374,2014-04-06,18:44:58,214537888,0 
420374,2014-04-06,18:41:50,214537888,0 
420374,2014-04-06,18:42:33,214537850,0 
420374,2014-04-06,18:42:38,214537850,0 
420374,2014-04-06,18:43:02,214537888,0 
420374,2014-04-06,18:43:10,214537888,0 
420369,2014-04-07,19:39:43,214839373,0 
420369,2014-04-07,19:39:56,214684513,0""") 

outfile = StringIO.StringIO() 

data1_iter, skip_1 = iter(file1), False 
data2_iter, skip_2 = iter(file2), False 

while True: 
    out = [] 
    if not skip_1: 
     try: 
      out.append(next(data1_iter).split()[0]) 
     except StopIteration: 
      skip_1 = True 
    if not skip_2: 
     try: 
      out.append(next(data2_iter).split()[0]) 
     except StopIteration: 
      skip_2 = True    

    outfile.write('\n'.join(out) + "\n") 
    if skip_1 and skip_2: 
     break 

print(outfile.getvalue()) 

輸出:

420374,2014-04-06,18:44:58.314,214537888,12462,1 
420374,2014-04-06,18:44:58,214537888,0 
420374,2014-04-06,18:44:58.325,214537850,10471,1 
420374,2014-04-06,18:41:50,214537888,0 
281626,2014-04-06,09:40:13.032,214535653,1883,1 
420374,2014-04-06,18:42:33,214537850,0 
420368,2014-04-04,06:13:28.848,214530572,6073,1 
420374,2014-04-06,18:42:38,214537850,0 
420368,2014-04-04,06:13:28.858,214835025,2617,1 
420374,2014-04-06,18:43:02,214537888,0 
140806,2014-04-07,09:22:28.132,214668193,523,1 
420374,2014-04-06,18:43:10,214537888,0 
140806,2014-04-07,09:22:28.176,214587399,1046,1 
420369,2014-04-07,19:39:43,214839373,0 
420369,2014-04-07,19:39:56,214684513,0