2015-02-07 49 views
0

我是多處理來處理一些非常大的文件。python multiprocessing shared Counter,酸洗錯誤

我可以使用在使用multiprocessing.BaseManager子類的進程之間共享的collections.Counter集合來統計特定字符串的出現次數。

雖然我可以分享櫃檯,似乎泡菜它似乎並沒有適當醃製。我可以將字典複製到一個新的字典,我可以泡菜。

我想了解如何避免共享計數器的「複製」,然後再選擇它。

這裏是我的(僞):

from multiprocessing.managers import BaseManager 
from collections import Counter 

class MyManager(BaseManager): 
    pass 

MyManager.register('Counter', Counter) 

def main(glob_pattern): 
    # function that processes files 
    def worker_process(files_split_to_allow_naive_parallelization, mycounterdict): 
     # code that loops through files 
     for line in file: 
      # code that processes line 
      my_line_items = line.split() 
      index_for_read = (my_line_items[0],my_line_items[6]) 
      mycounterdict.update((index_for_read,)) 

    manager = MyManager() 
    manager.start() 
    mycounterdict = manager.Counter() 

    # code to get glob files , split them with unix shell split and then chunk then 

    for i in range(NUM_PROCS): 
     p = multiprocessing.Process(target=worker_process , args = (all_index_file_tuples[chunksize * i:chunksize * (i + 1)],mycounterdict)) 
     procs.append(p) 
     p.start() 
    # Now we "join" the processes 
    for p in procs: 
     p.join() 

    # This is the part I have trouble with 
    # This yields a pickled file that fails with an error 
    pickle.dump(mycounterdict,open("Combined_count_gives_error.p","wb")) 

    # This however works 
    # How can I avoid doing it this way? 
    mycopydict = Counter() 
    mydictcopy.update(mycounterdict.items()) 
    pickle.dump(mycopydict,open("Combined_count_that_works.p","wb")) 

當我嘗試加載「醃」的錯誤給出文件這始終是一個較小的固定的大小,我得到一個錯誤,沒有任何意義。

如何在不通過上面僞代碼中創建的新鮮字典的情況下醃製共享字典。

>>> p = pickle.load(open("Combined_count_gives_error.p")) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1378, in load 
    return Unpickler(file).load() 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 858, in load 
    dispatch[key](self) 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 880, in load_eof 
    raise EOFError 
EOFError 
+0

。在你的代碼中的語法錯誤:'mycounterdict.update((index_for_read)' – 2015-02-07 18:25:33

+2

你的方法是我不會使用共享Counter對象,因爲通信和同步開銷實際上可能會限制單個工作進程的文件I/O性能。備選方法:讓單個工作者各自處理一個文件(或文件部分) (不要將原始數據發送給他們,讓他們從文件系統*讀取數據),讓每個工作人員創建一個計數器對象,將所有「子」計數器對象發送到您的主進程(通過隊列或管道),以及合併它們實際上,我最近在這裏提供了一個工作示例:http://stackoverflow.com/a/28097807/145400 – 2015-02-07 18:31:36

+0

@ Jan-PhilipGehrcke方法是你應該堅持的方法 - 當你進行多處理時,不要試圖擁有全局狀態 - 保持共享數據對於每個線程都是不可變的。爲了增加Jan的建議,我建議你先運行所有的子進程,然後讓主線程將它們合併到一起。 – Dyrborg 2015-02-07 19:12:08

回答

1

你的代碼有幾個問題。首先,如果你離開它,你不能保證關閉文件。其次,mycounterdict不是一個實際的Counter,而是一個替代品 - 醃製它,你會遇到很多問題,因爲它在這個過程之外是不可取的。但是,您不需要複製update.copy製作新的Counter副本。

因此,你應該使用

with open("out.p", "wb") as f: 
    pickle.dump(mycounterdict.copy(), f) 

至於這是否是一個很好的模式,答案是沒有。而不是使用一個共享的計數器,你應該在每個進程單獨計算,一個更簡單的代碼:

from multiprocessing import Pool 
from collections import Counter 
import pickle 

def calculate(file): 
    counts = Counter() 
    ... 
    return counts 

pool = Pool(processes=NPROCS) 
counts = Counter() 
for result in pool.map(calculate, files): 
    counts += result 

with open("out.p", "wb") as f: 
    pickle.dump(counts, f) 
+0

我喜歡你的答案,而且它的確與我在評論中已經鏈接的方法相同:http://stackoverflow.com/a/28097807/145400 ;-)。 – 2015-02-08 03:26:43