2012-06-15 198 views
9

使用Pool.apply_async運行大量任務(使用大參數)時,會分配進程並進入等待狀態,並且等待進程的數量沒有限制。這可以通過吃所有的記憶結束,如下面的例子:Python多處理:如何限制等待進程的數量?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

我在尋找一種方式來限制等待隊列,以這樣一種方式,只是在那裏等待處理的數量有限,並且Pool.apply_async在等待隊列已滿時被阻塞。

+0

不錯例子(+1)。 – mgilson

回答

6

multiprocessing.Pool有一個_taskqueue類型的成員multiprocessing.Queue,它需要一個可選的maxsize參數;不幸的是它構造它沒有maxsize參數集。

我建議將multiprocessing.Poolmultiprocessing.Pool.__init__的副本粘貼maxsize_taskqueue的構造函數。

猴修補的對象(游泳池或隊列)也將工作,但你必須猴補丁pool._taskqueue._maxsizepool._taskqueue._sem所以這將是非常脆弱:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

我正在使用Python 2.7.3,而_taskqueue的類型是Queue.Queue。這意味着它是一個簡單的Queue,而不是一個multiprocessing.Queue。子類化multiprocessing.Pool和覆蓋__init__工作正常,但猴子補丁對象不能按預期工作。但是,這是我正在尋找的黑客,謝謝。 –

0

你可以增加明確的隊列在這種情況下使用最大參數並使用queue.put()而不是pool.apply_async()。然後,工作進程可能:

for a, b in iter(queue.get, sentinel): 
    # process it 

如果要限制創建的輸入參數/結果是在內存中活動的工作流程的大致數數,那麼你可以使用pool.imap*()方法:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

使用'imap'沒有什麼區別。輸入隊列仍然是無限的,使用這個解決方案最終會吃掉所有的內存。 – Radim

+0

@Radim:即使您給它一個無限生成器,答案中的「imap」代碼也能正常工作。 – jfs

+0

不在Python 2中,不幸的是(沒有看過py3中的代碼)。對於一些解決方法,請參閱[這個SO答案](http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration)。 – Radim

1

如果pool._taskqueue超出所需尺寸,則等待:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

只是想補充一點,我發現這是針對我的多處理內存問題的最簡單的解決方案。我使用max_apply_size = 10,對我的問題工作正常,這是一個緩慢的文件轉換。使用信號量作爲@ecatmur建議似乎是一個更強大的解決方案,但對於簡單的腳本可能是矯枉過正的。 – Nate

相關問題