2016-11-11 25 views
2

我有大約50k類似csv的空間文件,每個文件都有大約幾千萬行。第一列總是沒有空格的字符串,第二列總是正整數,並且沒有丟失數據。在這個問題中,我只對第一欄感興趣,所以請忽略第二欄。這裏有兩個這樣的csv文件的玩具示例。成千上萬個表格之間的聯合行的可伸縮方式

example1.csv

f1 x 
f2 x 
f5 x 
f7 x 
... 

example2.csv

f1 x 
f2 x 
f3 x 
f4 x 
f6 x 
... 

正如你可以看到,在這兩個文件中設置的功能有重疊,但也不盡相同。我想要做的是將所有50k csv文件中的數據進行合併,並將其轉換爲以下格式。

file_name f1 f2 f3 f4 f5 f6 f7 .... 
example1.csv 1 1 0 0 1 0 1 ... 
example2.csv 1 1 1 1 0 1 0 ... 
... 

所以基本上構建的file_name X feature_id一個矩陣,如果存在該文件中的feature_id,那麼它的1,否則爲0。這裏的分析相對簡單,重點是可擴展性,和未來項目的行數可能會達到數十億。我可以訪問具有高達1 TB或2 TB內存和100個內核的機器。所以我認爲內存約束不是一個問題,但我的天真執行如下所示在玩具示例中運行良好,但對於真正的執行速度太慢,並且在第一個文件中達到約310000行時似乎掛起,而我不知道爲什麼。 (你知道爲什麼嗎?我的直覺表明它可能與defaultdict有關,不確定它是如何實現的,使用它可能會很昂貴。)我希望解決方案相當快。該解決方案最好是在Python中,但其他語言也很好。

import os 
import gzip 
from collections import defaultdict 

import pandas as pd 


# collect paths to all csv-like files 
with open('file_list.txt') as inf: 
    inputs = [_.split() for _ in inf.readlines] 

inputs_map = dict(zip(inputs, range(len(inputs)))) 

res = defaultdict(lambda :[0] * len(inputs)) 
for k, infile in enumerate(inputs): 
    print(k, infile) 
    source_file = os.path.abspath(infile) 
    source_file_id = inputs_map[source_file] 
    # starting parsing the csv-like file 
    with gzip.open(source_file, 'rt') as inf: 
     for kl, line in enumerate(inf): 
      feature_id = line.split()[0] 
      res[feature_id][source_file_id] = 1 
      if (kl + 1) % 10000 == 0: 
       print('File {0}'.format(k), 'Line {0}'.format(kl + 1), source_file) 

df = pd.DataFrame(res) 
df.index = inputs 
print('starting writing to disk...') 
df.T.to_csv('output.csv') 
+0

對於兩個文件你可以用'貓example1.csv example2.csv行名字|剪下-d「」-f 1 |排序-u'。 AFAIK'sort'非常快速和高效,如果沒有足夠的內存使用磁盤。反覆將這個過程應用於文件對可能是一個好方法。 –

+0

你的意思是首先獲得所有行的聯合,然後遍歷每個文件?我仍然需要哪些文件包含哪些功能的信息。 – zyxue

+0

可以使用hdfs + spark?作爲可擴展性,它似乎是好工具? –

回答

2
xto1 = lambda x: 1 
def read_ex(fn): 
    s = pd.read_csv(
     fn, sep=' ', header=None, 
     index_col=0, usecols=[0, 1], 
     converters={1: xto1}, 
     names=[None, fn], 
     squeeze=True) 
    return s 

fs = ['example1.csv', 'example2.csv'] 

pd.concat([read_ex(f) for f in fs], keys=fs).unstack(fill_value=0) 

enter image description here

+0

你確定這會擴展嗎? '[read_ex(f)for fs]'對我來說看起來很棘手。 – zyxue

+0

老實說,我會試試 – zyxue

+0

,我不確定。首先嚐試使用幾個文件。 – piRSquared

相關問題