2016-11-15 65 views
0

我正在開發一個軟件,以使用不同方法(單線程,多線程,多進程)對某些腳本進行Python基準測試。所以我需要在不同的過程中執行相同的功能(使用相同的參數等)。將函數作爲參數傳遞給進程目標使用Pool.map()

如何將函數作爲參數傳遞給進程目標?

我目前的理解是,對函數的引用無法工作,因爲引用的函數對其他進程不可見,這就是爲什麼我嘗試使用共享內存的自定義管理器。

這裏的簡化代碼:

#!/bin/python 

from multiprocessing import Pool 
from multiprocessing.managers import BaseManager 
from itertools import repeat 

class FunctionManager(BaseManager): 
    pass 

def maFunction(a, b): 
    print(a + b) 

def threadedFunction(f_i_args): 
    (f, i, args) = f_i_args 
    f(*args) 

FunctionManager.register('Function', maFunction) 

myManager = FunctionManager() 
myManager.start() 

myManager.Function(0, 0) # Test 1 
threadedFunction((maFunction, 0, (1, 1))) # Test 2 

p = Pool() 
args = zip(repeat(myManager.Function), range(10), repeat(2, 2)) 
p.map(threadedFunction, args) # Does not work 
p.join() 

myManager.shutdown() 

目前酸洗錯誤在 「p.map()」 是這樣的:

2 
0 
Traceback (most recent call last): 
    File "./test.py", line 27, in <module> 
    p.map(threadedFunction, args) # Does not work 
    File "/usr/lib/python3.5/multiprocessing/pool.py", line 260, in map 
    return self._map_async(func, iterable, mapstar, chunksize).get() 
    File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get 
    raise self._value 
    File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks 
    put(task) 
    File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send 
    self._send_bytes(ForkingPickler.dumps(obj)) 
    File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps 
    cls(buf, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <class 'weakref'>: attribute lookup weakref on builtins failed 

回答

1

我從運行代碼變得有點不同的錯誤。我認爲你的關鍵問題是你將一個函數傳遞給FunctionManager.register()而不是一個類。我也必須刪除您的zip文件才能使其工作並手動創建列表,但您可以修復此問題。這只是一個例子。

下面的代碼工作,並使用您的確切結構做一些事情。我會做這個有點不同,不使用BaseManager,但我認爲你有你的理由。

#!/usr/bin/python3.5 

from multiprocessing import Pool 
from multiprocessing.managers import BaseManager 
from itertools import repeat 

class FunctionManager(BaseManager): 
    pass 


class maClass(object): 
    def __init__(self): 
     pass 
    def maFunction(self,a, b): 
     print(a + b) 

def threadedFunction(f_i_args): 
    (f, i, args) = f_i_args 
    f(*args) 

FunctionManager.register('Foobar', maClass) 

myManager = FunctionManager() 
myManager.start() 
foobar = myManager.Foobar() 

foobar.maFunction(0, 0) # Test 1 
threadedFunction((foobar.maFunction, 0, (1, 1))) # Test 2 

p = Pool() 
#args = list(zip(repeat(foobar.maFunction), range(10), repeat(2, 2))) 
args = [] 
for i in range(10): 
    args.append([foobar.maFunction, i, (i,2)]) 


p.map(threadedFunction, args) # Does now work 
p.close() 
p.join() 

myManager.shutdown() 

還是我誤解了你的問題?

Hannu

+0

感謝您的回答。 maFunction是在頂層定義的,我無法改變它。我發現了一種似乎「可以工作」的方式:http://pastebin.com/GNB7ZybS。但是通過你的解決方案(因此也是一個鏈接),所有的進程一起不使用多於一個邏輯核心。 CPU使用率保持在12.5%(我有8個邏輯核心)。也許它是經理相關的?你會如何改變? –

+0

其實,這裏的問題是可重現的:http://pastebin.com/56ALQqJk –

+0

你碰巧在你的代碼中導入numpy嗎? – Hannu

相關問題