2015-12-03 20 views
2

我想並行化一個文件過濾操作,其中每個過濾器是一個大的正則表達式,因此整個事情需要時間來運行。該文件本身大約100GB。單進程的版本是這樣的:python使用多進程來過濾海量文件

def func(line): 
    # simple function as an example 
    for i in range(10**7): 
     pass 
    return len(line) % 2 == 0 


with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr: 
    for line in input: 
     if func(line): 
      out_sr.write(line) 

我嘗試使用multiprocessingimap但給人ValueError: I/O operation on closed file.我認爲迭代被複制到每一個過程,但不是所有的進程有處理打開。

有沒有辦法做到這一點使用multiprocessing,最好是利用池?

+0

如果__name__ =='__main __':'?你必須保持線條的秩序? – eph

+0

@eph是的行必須與輸入文件的順序相同。在我的真實代碼中,'with'是函數中的某個地方。 – simonzack

+0

什麼是你的文件和正則表達式?在命令行或其他文件處理工具上使用awk會更容易嗎? – DainDwarf

回答

1

我沒有錯誤運行下面的代碼。確保您不要在with聲明之外撥打in_srout_sr

from multiprocessing import Pool 

def func(line): 
    # simple function as an example 
    for i in xrange(10**7): 
     pass 
    return len(line) % 2 == 0, line 

def main(): 
    with open('input.txt','r') as in_sr, open('output.txt', 'w') as out_sr: 
     pool = Pool(processes=4) 
     for ret,line in pool.imap(func, in_sr, chunksize=4): 
      if ret: 
       out_sr.write(line) 
     pool.close() 

if __name__ == '__main__': 
    main() 
+0

奇怪,我只是試圖安裝python 3.5.0,它確實在那裏工作,我認爲我以前的python版本是buggy(它是3.4.x)。感謝您的答案,它絕對幫助我診斷問題! – simonzack

+0

Btw'contextlib。關閉'可以在這裏用作替代風格。 – simonzack

1

的代碼與此類似:

def func(line): 
    ... 

if __name__ == '__main__': 

    from multiprocessing import Pool 
    from itertools import tee, izip 

    pool = Pool(processes=4) 

    with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr: 
     lines1, lines2 = tee(in_sr) 
     for line, flag in izip(lines1, pool.imap(func, lines2)): 
      if flag: 
       out_sr.write(line) 
+0

由於ValueError,imap沒有工作,請參閱我的問題 – simonzack

+0

@simonzack我不認爲'ValueError'是由於'imap',如果只有行字符串作爲參數傳遞。 – eph