2017-02-01 83 views
0

背景: 我有一個巨大的文件.txt我必須處理。這是一個data mining項目。 所以我把它拆分了許多.txt文件每一個100MB大小,保存他們都在同一個目錄,並設法在他們這樣說:的Python - 多和文本文件處理

from multiprocessing.dummy import Pool

for filename in os.listdir(pathToFile): 
    if filename.endswith(".txt"): 
     process(filename) 
    else: 
     continue 

在過程中,我解析文件到對象列表中,然後我應用另一個函數。這是SLOWER比按原樣運行整個文件。但對於足夠大的文件,我將無法立即運行,我將不得不分片。所以我想要有線程,因爲我不必等待每個process(filename)完成。

我如何申請呢?我檢查this,但我不知道如何將它應用到我的代碼...

任何幫助,請將不勝感激。 我看了here看看如何做到這一點。我已經試過:

pool = Pool(6) 
for x in range(6): 
    futures.append(pool.apply_async(process, filename)) 

不幸的是,我意識到這隻會做前6個的文本文件,或將不是嗎?我怎樣才能使它工作?一旦一個線程結束,爲其分配另一個文件文本並開始運行。

編輯:

for filename in os.listdir(pathToFile): 
    if filename.endswith(".txt"): 
     for x in range(6): 
      pool.apply_async(process(filename)) 
    else: 
     continue 
+0

在循環中傳遞所有文件名。 6意味着6個文件將被同時處理。但不知道你會因爲Python GIL和線程而獲得速度。你應該看看多處理。 –

+0

您是在談論線程池還是進程池? – roganjosh

+0

@roganjosh,它是相同的程序,所以它必須是線程,不是嗎? –

回答

2

首先,使用multiprocessing.dummy只會給你的速度增加,如果你的問題是IO綁定(讀取文件時的主要瓶頸),爲CPU密集型任務(處理文件是瓶頸)它不會幫助,在這種情況下,你應該使用「真實」multiprocessing

您所描述的問題似乎更適合使用的Poolmap功能之一:

from multiprocessing import Pool 
files = [f for f in os.listdir(pathToFile) if f.endswith(".txt")] 
pool = Pool(6) 
results = pool.map(process, files) 
pool.close() 

這將使用6個工作進程處理文件的列表,並返回返回值的列表process()函數處理完所有文件後。您當前的示例將提交相同的文件6次。

+0

不錯,簡單的答案。你不需要'關閉()'和'加入()'池來訪問結果嗎? – roganjosh

+0

我沒有文件列表。我在'os.list ...'中使用'for filename來訪問特定文件夾中的所有'.txt'文件。 –

+0

@roganjosh否,當使用'map()'的時候,你不要使用'join()',因爲當它返回時,所有的worker都已經完成了他們的任務。調用'close()'允許工人終止,所以這是一個很好的習慣,thx的提示。 – mata