6

我正在學習python多處理,並且我試圖使用此功能來填充具有os中存在的所有文件的列表。但是,我寫的代碼只是按順序執行。使用管理器,池和共享列表不工作的Python多處理併發

#!/usr/bin/python 
import os 
import multiprocessing 
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/" 
manager = multiprocessing.Manager() 
files = manager.list() 


def get_files(x): 
    for root, dir, file in os.walk(x): 
     for name in file: 
      files.append(os.path.join(root, name)) 

mp = [multiprocessing.Process(target=get_files, args=(tld[x],)) 
     for x in range(len(tld))] 

for i in mp: 
    i.start() 
    i.join() 
print len(files) 

當我檢查進程樹時,我可以看到只有一個智利進程產生。 (男人pstree說{}表示由父催生子進程。)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752) 
           `-python(12750)` 

我一直在尋找的是,產卵的過程對每個TLD目錄,填充共享列表files,這將是左右10-15個進程取決於目錄的數量。我究竟做錯了什麼?

編輯::

我以前multiprocessing.Pool創建工作線程,而這一次 進程催生,但給人錯誤,當我嘗試使用multiprocessing.Pool.map()。我是指在python文檔以下代碼表示

from multiprocessing import Pool 
def f(x): 
return x*x 

if __name__ == '__main__': 
    p = Pool(5) 
    print(p.map(f, [1, 2, 3])) 

繼該示例中,我改寫的代碼作爲

import os 
import multiprocessing 
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] 
manager = multiprocessing.Manager() 
pool = multiprocessing.Pool(processes=len(tld)) 
print pool 
files = manager.list() 
def get_files(x): 
    for root, dir, file in os.walk(x): 
     for name in file: 
      files.append(os.path.join(root, name)) 
pool.map(get_files, [x for x in tld]) 
pool.close() 
pool.join() 
print len(files) 

並且分叉多個進程。

---bash(10949)---python(12890)-+-python(12967) 
           |-python(12968) 
           |-python(12970) 
           |-python(12971) 
           |-python(12972) 
           ---snip--- 

但代碼示數說

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

我在做什麼錯在這裏,爲什麼做get_files()函數出錯了呢?

回答

3

這僅僅是因爲你定義功能get_files之前實例化池:

import os 
import multiprocessing 

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] 
manager = multiprocessing.Manager() 

files = manager.list() 
def get_files(x): 
    for root, dir, file in os.walk(x): 
     for name in file: 
      files.append(os.path.join(root, name)) 

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here 

pool.map(get_files, [x for x in tld]) 
pool.close() 
pool.join() 
print len(files) 

進程的總體思路是,在你啓動的瞬間,你叉的主進程的內存。所以任何定義主進程中完成之後fork不會在子進程中。

如果你想有一個共享內存,你可以使用threading庫,但是你將有一些問題,它(CF:The global interpreter lock

+0

謝謝你,這工作。但是,我想知道,因爲tld已經定義了,爲什麼在函數重要之前定義池?定義池時沒有提及該函數。 – nohup

+0

你的'pool.map'中有一個:)。通過使用'pool.map',你可以讓你的進程使用'get_files'函數。 – FunkySayu

+0

同意。但是,在函數之後定義了pool.map,儘管pool之前被定義過,因爲在定義池時,根據python doc,它只是要生成的工作進程的數量。 'pool = Pool(processes = 4)#start 4 worker processes'請糾正我誤解的地方。再一次,非常感謝@FunkySayu – nohup