我有大約50k類似csv的空間文件,每個文件都有大約幾千萬行。第一列總是沒有空格的字符串,第二列總是正整數,並且沒有丟失數據。在這個問題中,我只對第一欄感興趣,所以請忽略第二欄。這裏有兩個這樣的csv文件的玩具示例。成千上萬個表格之間的聯合行的可伸縮方式
example1.csv
f1 x
f2 x
f5 x
f7 x
...
example2.csv
f1 x
f2 x
f3 x
f4 x
f6 x
...
正如你可以看到,在這兩個文件中設置的功能有重疊,但也不盡相同。我想要做的是將所有50k csv文件中的數據進行合併,並將其轉換爲以下格式。
file_name f1 f2 f3 f4 f5 f6 f7 ....
example1.csv 1 1 0 0 1 0 1 ...
example2.csv 1 1 1 1 0 1 0 ...
...
所以基本上構建的file_name
X feature_id
一個矩陣,如果存在該文件中的feature_id
,那麼它的1,否則爲0。這裏的分析相對簡單,重點是可擴展性,和未來項目的行數可能會達到數十億。我可以訪問具有高達1 TB或2 TB內存和100個內核的機器。所以我認爲內存約束不是一個問題,但我的天真執行如下所示在玩具示例中運行良好,但對於真正的執行速度太慢,並且在第一個文件中達到約310000行時似乎掛起,而我不知道爲什麼。 (你知道爲什麼嗎?我的直覺表明它可能與defaultdict有關,不確定它是如何實現的,使用它可能會很昂貴。)我希望解決方案相當快。該解決方案最好是在Python中,但其他語言也很好。
import os
import gzip
from collections import defaultdict
import pandas as pd
# collect paths to all csv-like files
with open('file_list.txt') as inf:
inputs = [_.split() for _ in inf.readlines]
inputs_map = dict(zip(inputs, range(len(inputs))))
res = defaultdict(lambda :[0] * len(inputs))
for k, infile in enumerate(inputs):
print(k, infile)
source_file = os.path.abspath(infile)
source_file_id = inputs_map[source_file]
# starting parsing the csv-like file
with gzip.open(source_file, 'rt') as inf:
for kl, line in enumerate(inf):
feature_id = line.split()[0]
res[feature_id][source_file_id] = 1
if (kl + 1) % 10000 == 0:
print('File {0}'.format(k), 'Line {0}'.format(kl + 1), source_file)
df = pd.DataFrame(res)
df.index = inputs
print('starting writing to disk...')
df.T.to_csv('output.csv')
對於兩個文件你可以用'貓example1.csv example2.csv行名字|剪下-d「」-f 1 |排序-u'。 AFAIK'sort'非常快速和高效,如果沒有足夠的內存使用磁盤。反覆將這個過程應用於文件對可能是一個好方法。 –
你的意思是首先獲得所有行的聯合,然後遍歷每個文件?我仍然需要哪些文件包含哪些功能的信息。 – zyxue
可以使用hdfs + spark?作爲可擴展性,它似乎是好工具? –