2017-03-16 69 views
0

特定行我有格式的大文件(500萬行):提取從一個大文件

'User ID,Mov ID,Rating,Timestamp' 

我還有一個文件(200,000線)與小得多沒有。的形式記錄:

'User ID, Mov ID' 

我必須生成一個新的文件,例如,如果(用戶ID,MOV ID)從所述第二文件中的500萬行第一文件的我不應該有它匹配任何的記錄在我的新文件中。 換句話說,新文件由唯一的用戶ID,Mov ID在它沒有任何公用(用戶ID,Mov ID)和file2(200,000行)的情況下由唯一的用戶ID,Mov ID組成

我正在嘗試這種簡單的方法,花費太多時間。是否有快速的算法中實現?:

from sys import argv 
import re 
script, filename1, filename2 = argv 
#open files 
testing_small= open(filename1) 
ratings=open(filename2) 
##Open file to write thedata 
ratings_training=open("ratings_training.csv",'w') 

for line_rating in ratings: 
    flag=0;testing_small.seek(0) 
    for line_test in testing_small: 
     matched_line=re.match(line_test.rstrip(),line_rating) 
     if matched_line: 
      flag=1;break 
    if(flag==0): 
     ratings_training.write(line_rating) 


testing_small.close() 
ratings.close() 
ratings_training.close() 

我可以使用任何火花爲基礎的方法藏漢

+0

您是否嘗試過熊貓/ numpy的? –

+0

這是一個Csv文件嗎? –

+0

是的,不,我不知道如何使用numpy –

回答

1

例如:

# df1: 
User_ID,Mov_ID,Rating,Timestamp 
sam,apple,0.6,2017-03-17 09:04:39 
sam,banana,0.7,2017-03-17 09:04:39 
tom,apple,0.3,2017-03-17 09:04:39 
tom,pear,0.9,2017-03-17 09:04:39 

# df2: 
User_ID,Mov_ID 
sam,apple 
sam,pear 
tom,apple 

在大熊貓:

import pandas as pd 

df1 = pd.read_csv('./disk_file') 
df2 = pd.read_csv('./tmp_file') 
res = pd.merge(df1, df2, on=['User_ID', 'Mov_ID'], how='left', indicator=True) 
res = res[res['_merge'] == 'left_only'] 
print(res) 

還是在火花:

cfg = SparkConf().setAppName('MyApp') 
spark = SparkSession.builder.config(conf=cfg).getOrCreate() 

df1 = spark.read.load(path='file:///home/zht/PycharmProjects/test/disk_file', format='csv', sep=',', header=True) 
df2 = spark.read.load(path='file:///home/zht/PycharmProjects/test/tmp_file', format='csv', sep=',', header=True) 
res = df1.join(df2, on=[df1['User_ID'] == df2['User_ID'], df1['Mov_ID'] == df2['Mov_ID']], how='left_outer') 
res = res.filter(df2['User_ID'].isNotNull()) 
res.show() 
+0

火花Intellij IDE說'不能解決與這樣的簽名加入'你能指出一個參考例子,我們在哪裏使用'on'和'how'? –

+0

您使用哪個版本的火花 –

+0

我使用的是2.1.0。我使用scala 2.11.5 –

0

您應該將較小的文件永久保存在內存中。那麼你在不保存整個文件的情況下逐行處理大文件。

代碼沒有經過測試:

# read the smaller filter file 
filter = set() 
with open(reffile, "rt") as f: 
    for line in f: 
     user, movie = line.strip().split(",") 
     filter.add((user, movie)) 
# test the large file an write filtered data 
with open(outfile, "wt") as f_out: 
    with open(bigfile, "rt") as f_in: 
     for line in f_in: 
      user, movie, _, _ = line.strip().split(",") 
      if (user, movie) not in filter: 
       print(",".join((user, movie)), file=f_out)