2014-05-03 180 views
2

多處理池在使用:如何暫停與執行

def myFunction(arg): 
    for i in range(10000): 
     pass 

from multiprocessing import Pool 
pool = Pool(processes=3) 
pool.map_async(myFunction, ['first','second','third']) 

我希望用戶能夠在任何給定時間的池開始後暫停多泳池的執行。然後,我希望用戶能夠取消暫停(繼續)游泳池中剩餘的項目。如何實現它?

編輯:

這裏是熱賣Blckknght建議工作落實。謝謝Blckknght!

import multiprocessing 
from PyQt4 import QtGui, QtCore 


def setup(event): 
    global unpaused 
    unpaused = event 

def myFunction(arg=None): 
    unpaused.wait() 
    print "Task started...", arg 
    for i in range(15000000): 
     pass 
    print '...task completed.', arg 


class MyApp(object): 

    def __init__(self): 
     super(MyApp, self).__init__() 

     app = QtGui.QApplication(sys.argv) 
     self.mainWidget = QtGui.QWidget() 
     self.mainLayout = QtGui.QVBoxLayout() 
     self.mainWidget.setLayout(self.mainLayout) 

     self.groupbox = QtGui.QGroupBox() 
     self.layout = QtGui.QVBoxLayout() 
     self.groupbox.setLayout(self.layout) 

     self.pauseButton = QtGui.QPushButton('Pause') 
     self.pauseButton.clicked.connect(self.pauseButtonClicked)  
     self.layout.addWidget(self.pauseButton) 

     self.okButton = QtGui.QPushButton('Start Pool') 
     self.okButton.clicked.connect(self.startPool) 
     self.layout.addWidget(self.okButton) 
     self.layout.addWidget(self.pauseButton)  

     self.mainLayout.addWidget(self.groupbox) 
     self.mainWidget.show() 
     sys.exit(app.exec_()) 


    def startPool(self): 
     self.event = multiprocessing.Event() 
     self.pool=multiprocessing.Pool(1, setup, (self.event,)) 
     self.result=self.pool.map_async(myFunction, [1,2,3,4,5,6,7,8,9,10]) 
     self.event.set() 
     # self.result.wait()  

    def pauseJob(self): 
     self.event.clear() 

    def continueJob(self): 
     self.event.set() 

    def pauseButtonClicked(self): 
     if self.pauseButton.text()=='Pause': 
      print '\n\t\t ...pausing job...','\n' 
      self.pauseButton.setText('Resume') 
      self.pauseJob() 
     else: 
      print '\n\t\t ...resuming job...','\n' 
      self.pauseButton.setText('Pause')    
      self.continueJob() 

if __name__ == '__main__': 
    MyApp() 

回答

7

聽起來好像你想用multiprocessing.Event來控制你的工作函數的運行。您可以創建一個,然後將其傳遞給池中的initializer,然後在myFunction中等待它。

下面是一個運行員工的例子,每秒鐘都會打印他們的參數。工作人員可以暫停活動,並由set重新開始。

from time import sleep 
import multiprocessing 

def setup(event): 
    global unpaused 
    unpaused = event 

def myFunction(arg): 
    for i in range(10): 
     unpaused.wait() 
     print(arg) 
     sleep(1) 

if __name__ == "__main__": 
    event = multiprocessing.Event() # initially unset, so workers will be paused at first 
    pool = multiprocessing.Pool(3, setup, (event,)) 
    result = pool.map_async(myFunction, ["foo", "bar", "baz"]) 
    event.set() # unpause workers 
    sleep(5) 
    event.clear() # pause after five seconds 
    sleep(5) 
    event.set() # unpause again after five more seconds 
    result.wait() # wait for the rest of the work to be completed 

的工作進程應該打印每個"foo""bar""baz"十次,以每次重複之間的一秒的延遲。工人在頭五秒後會暫停,並在五秒鐘後重新啓動。根據您的實際使用情況,可能有各種方法來改進此代碼,但希望這足以讓您朝着正確的方向前進。

+0

感謝您的建議!我剛剛編輯了一個你剛纔提到的原始問題。請讓我知道是否有任何我錯過了... – alphanumeric

+0

我很高興它爲你工作。如果您覺得我的回答有幫助,請考慮點擊旁邊的複選標記來接受它。 – Blckknght

+0

非常有幫助!再次感謝! – alphanumeric