2012-10-24 109 views
6

使用map_async()時,我似乎無法讓我的回調工作。當我使用略有修改的代碼循環遍歷我的數組,通過apply_async()添加任務時,它可以工作。從文檔看來我應該能夠使用與map_async回調(),但也許它某種福利局錯誤的...多處理.pools.map_async無法正常工作的Python回調?

from multiprocessing import Pool,TimeoutError 
from time import sleep 

servers=["s1","s2","s3","s4","s5","s6"] 

def f(x): 
    print("start f(" + x + ")") 
    sleep(5) 
    print("end f(" + x + ")") 
    return "did " + x 

def mycallback(x): 
    print("My callback " + str(x)) 

def myerrorcallback(r): 
    print("My errorcallback " + str(r)) 

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    results = pool.map_async(f, servers, chunksize=1, callback=mycallback, error_callback=myerrorcallback) 
    print(results.get(timeout=11)) 

在運行的時候,我得到:

D:\python> f.py 
start f(s1) 
start f(s2) 
start f(s3) 
start f(s4) 
end f(s1) 
start f(s5) 
end f(s2) 
start f(s6) 
end f(s4) 
end f(s3) 
end f(s5) 
end f(s6) 
['did s1', 'did s2', 'did s3', 'did s4', 'did s5', 'did s6'] 

當我使用使用apply_async()修改代碼,而是從回調中獲得打印輸出。修改後的代碼是最後一部分只是更改爲:

if __name__ == '__main__': 
    pool = Pool(processes=4) 
    for server in servers: 
     pool.apply_async(f, (server,), callback=mycallback, error_callback=myerrorcallback) 
    pool.close() 
    pool.join() 

結果:

D:\python\>fb.py 
start f(s1) 
start f(s2) 
start f(s3) 
start f(s4) 
end f(s1) 
start f(s5) 
My callback did s1 
end f(s2) 
My callback did s2 
start f(s6) 
end f(s3) 
My callback did s3 
end f(s4) 
My callback did s4 
end f(s5) 
My callback did s5 
end f(s6) 
My callback did s6 
+0

我無法重現你的輸出,在我的機器上輸出一個額外的行「我的回調[‘做S1’,‘S2做’,‘做了S3’,‘沒S4’,‘做S5’ ,'s6']「。你能發佈你的「修改」代碼嗎? – del

+0

哪個版本的python?我在窗口上使用3.3。 – Bbb

+0

我也編輯了我的問題以添加可以工作的代碼。 – Bbb

回答

2

好,我花了一個機會,並記錄一個錯誤吧。結果發現它實際上是3.3中的一個bug,並且一個補丁正在進行中。

http://bugs.python.org/issue16307

1

python 3.4.3,回調被稱爲與積累的結果,但回調的返回值被忽略。重點是什麼?

from multiprocessing import Pool 

data = [1, 2, 3] 

def calc(x): 
    return 2*x 

def increment(results): 
    print('callback called') 
    return [x+100 for x in results] 

with Pool(None) as pool: #None => calls cpu.count() 
    results = pool.map_async(calc, data, callback=increment) 
    print(results.get()) 

--output:-- 
callback called 
[2, 4, 6]