2016-10-11 18 views
1

我在multiprocessing上閱讀了python文檔。如何讓兩個mapper函數平行運行?

但我不得不像下面的要求:

from multiprocessing import Pool 
import time 

def f(x): 
    print(x) 
    time.sleep(100000000000000); 
    return x*x 

def f2(x): 
    print('**' + str(x) + '**') 
    time.sleep(100000000000000); 
    return x*x*x; 
if __name__ == '__main__': 
    p = Pool(5) 
    print(p.map(f, [1, 2, 3])) 
    print(p.map(f2,[1,2,3])) 

我在函數f阻塞用例(在我的情況下,它監聽到RabbitMQ的隊列,所以應該會阻止我使用長時間延遲-100000000000000來暗示無限時間阻塞)。 我用這裏的time.delay模擬了阻塞。

但需要的是我想f2(這是不同於f)開始並行運行。目前它甚至不會像f本身一樣阻止f2。

有人可以請給出一些關於如何使f和f2平行啓動的指導,儘管事實上f和f2都是阻塞的。

更新:

好像我找到了一種方法,但如何重新分配,即使它RES1工作以及它調用甚至˚F後不解

from multiprocessing import Pool 
import time 

def f(x): 
    print(x) 
    time.sleep(100000000000000); 
    return x*x 

def f2(x): 
    print('**' + str(x) + '**') 
    time.sleep(100000000000000); 
    return x*x*x; 

if __name__ == '__main__': 
    p = Pool(5) 
    res = p.apply_async(f, [2]) 
    res = p.apply_async(f2,[4]) 
    res.get() 

回答

0

您的方法不起作用,因爲第一個池必須完成才能在之前打印第二個池之前的結果。所以print種阻止第二個池開始。

我用一個線程池同時啓動兩個包裝,並且我異步獲得結果。我不得不做一個循環,而不是大的sleep,因爲python抱怨睡眠太大。但相當於同樣的事情。 在我的情況下,打印輸出爲1,2,3 ,,一次。如果我將等待循環設置爲合理的值,我甚至可以得到一些正確的結果。

from multiprocessing import Pool 
import concurrent.futures 


import time 

sleep_times = 1000000 

def f(x): 
    print(x) 
    for i in range(sleep_times): 
     time.sleep(10) 
    return x*x 

def f2(x): 
    print('**' + str(x) + '**') 
    for i in range(sleep_times): 
     time.sleep(10) 
    return x*x*x; 

def fwrap(l): 
    p = Pool(len(l)) 
    return(p.map(f, l)) 

def fwrap2(l): 
    p = Pool(len(l)) 
    return(p.map(f2, l)) 

if __name__ == '__main__': 
    jr = dict() 
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: 
     jr[executor.submit(fwrap, (1, 2, 3))] = "f" 
     jr[executor.submit(fwrap2, (1, 2, 3))] = "f2" 

    for future in concurrent.futures.as_completed(jr): 
     ident = jr[future] 
     print(ident,future.result()) 

輸出(當睡眠循環降低到1,以避免等待太長)

1 
**1** 
2 
**2** 
**3** 
3 

那裏它等待一段時間(已被 「處理時間」),那麼:

('f', [1, 4, 9]) 
('f2', [1, 8, 27]) 
+0

嗨,我apply_async下的代碼似乎工作..我不知道如何。你能指導嗎? –

+0

'apply_async'只適用於1個值。您似乎想要使用列表作爲輸入來構建列表。 'apply_async'不會那樣做。您必須重新輸入輸入順序的結果順序。 –

+0

不,我想讓f和f2開始,一個不應該阻止另一個,這是正確的? –