我是多處理來處理一些非常大的文件。python multiprocessing shared Counter,酸洗錯誤
我可以使用在使用multiprocessing.BaseManager子類的進程之間共享的collections.Counter集合來統計特定字符串的出現次數。
雖然我可以分享櫃檯,似乎泡菜它似乎並沒有適當醃製。我可以將字典複製到一個新的字典,我可以泡菜。
我想了解如何避免共享計數器的「複製」,然後再選擇它。
這裏是我的(僞):
from multiprocessing.managers import BaseManager
from collections import Counter
class MyManager(BaseManager):
pass
MyManager.register('Counter', Counter)
def main(glob_pattern):
# function that processes files
def worker_process(files_split_to_allow_naive_parallelization, mycounterdict):
# code that loops through files
for line in file:
# code that processes line
my_line_items = line.split()
index_for_read = (my_line_items[0],my_line_items[6])
mycounterdict.update((index_for_read,))
manager = MyManager()
manager.start()
mycounterdict = manager.Counter()
# code to get glob files , split them with unix shell split and then chunk then
for i in range(NUM_PROCS):
p = multiprocessing.Process(target=worker_process , args = (all_index_file_tuples[chunksize * i:chunksize * (i + 1)],mycounterdict))
procs.append(p)
p.start()
# Now we "join" the processes
for p in procs:
p.join()
# This is the part I have trouble with
# This yields a pickled file that fails with an error
pickle.dump(mycounterdict,open("Combined_count_gives_error.p","wb"))
# This however works
# How can I avoid doing it this way?
mycopydict = Counter()
mydictcopy.update(mycounterdict.items())
pickle.dump(mycopydict,open("Combined_count_that_works.p","wb"))
當我嘗試加載「醃」的錯誤給出文件這始終是一個較小的固定的大小,我得到一個錯誤,沒有任何意義。
如何在不通過上面僞代碼中創建的新鮮字典的情況下醃製共享字典。
>>> p = pickle.load(open("Combined_count_gives_error.p"))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1378, in load
return Unpickler(file).load()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 880, in load_eof
raise EOFError
EOFError
。在你的代碼中的語法錯誤:'mycounterdict.update((index_for_read)' – 2015-02-07 18:25:33
你的方法是我不會使用共享Counter對象,因爲通信和同步開銷實際上可能會限制單個工作進程的文件I/O性能。備選方法:讓單個工作者各自處理一個文件(或文件部分) (不要將原始數據發送給他們,讓他們從文件系統*讀取數據),讓每個工作人員創建一個計數器對象,將所有「子」計數器對象發送到您的主進程(通過隊列或管道),以及合併它們實際上,我最近在這裏提供了一個工作示例:http://stackoverflow.com/a/28097807/145400 – 2015-02-07 18:31:36
@ Jan-PhilipGehrcke方法是你應該堅持的方法 - 當你進行多處理時,不要試圖擁有全局狀態 - 保持共享數據對於每個線程都是不可變的。爲了增加Jan的建議,我建議你先運行所有的子進程,然後讓主線程將它們合併到一起。 – Dyrborg 2015-02-07 19:12:08