2012-09-26 34 views
1

我需要實現一個系統,其中包含一個主進程,用於管理執行其他任務的從進程。我有兩種不同的從機類型,每個從機需要6個實例。我寫了一些可以工作的東西,但是它會殺死每個進程,並在任務完成時啓動一個新進程。這是不可取的,因爲產生新的過程是昂貴的。我希望保持每個從站作爲一個進程運行,並在完成時收到通知,並用新的輸入重新運行。在Python中管理固定數量的工作人員

我目前的僞代碼如下。它並不完美;因爲我沒有與我的實際代碼,所以我正在進行討論。

# SlaveTypeB is pretty much the same. 
class SlaveTypeA(multiprocessing.Process): 
    def __init__(self, val): 
     self.value = val 
     self.result = multiprocessing.Queue(1) 
     self.start() 
    def run(self): 
     # In real life, run does something that takes a few seconds. 
     sleep(2) 
     # For SlaveTypeB, assume it writes self.val to a file instead of incrementing 
     self.result.put(self.val + 1) 
    def getResult(self): 
     return self.result.get()[0] 


if __name__ == "__main__": 
    MAX_PROCESSES = 6 
    # In real life, the input will grow as the while loop is being processed 
    input = [1, 4, 5, 6, 9, 6, 3, 3] 
    aProcessed = [] 
    aSlaves = [] 
    bSlaves = [] 

    while len(input) > 0 or len(aProcessed) > 0: 
     if len(aSlaves) < MAX_PROCESSES and len(input) > 0: 
      aSlaves.append(SlaveTypeA(input.pop(0)) 
     if len(bSlaves) < MAX_PROCESSES and len(aProcessed) > 0 : 
      bSlaves.append(SlaveTypeB(aProcesssed.pop(0)) 
     for aSlave in aSlaves: 
      if not aSlave.isAlive(): 
       aProcessed = aSlave.getResult() 
       aSlaves.remove(aSlave) 
     for bSlave in bSlaves: 
      if not bSlave.isAlive(): 
       bSlaves.remove(bSlave) 

我該怎麼做才能讓slaves和bSlaves中的進程不會被殺死並重新生成。我在想我可以使用管道,但是我不知道如何知道流程何時完成,而無需等待。

編輯 我重寫了這個使用管道,它解決了我的問題,無法保持進程運行。仍然希望以最好的方式進行輸入。我忽略了slaveB部分,因爲只有一個工人類型可以簡化問題。

class Slave(Process) 
    def __init__(self, id): 
     # Call super init, set id, set idlestate = true, etc 
     self.parentCon, self.childCon = Pipe() 
     self.start() 

    def run(self): 
     while True: 
      input = self.childCon.recv() 
      # Do something here in real life 
      sleep(2) 
      self.childCon.send(input + 1) 

    #def isIdle/setIdle(): 
     # Getter/setter for idle 

    def tryGetResult(self):    
     if self.parentCon.poll(): 
      return self.parentCon.recv() 
     return False 

    def process(self, input): 
     self.parentConnection.send(input) 

if __name__ == '__main__' 
    MAX_PROCESSES = 6 
    jobs = [1, 4, 5, 6, 9, 6, 3, 3] 
    slaves = [] 
    for int i in range(MAX_PROCESSES): 
     slaves.append(Slave(i)) 
    while len(jobs) > 0: 
     for slave in slaves: 
      result = slave.tryGetResult() 
      if result: 
       # Do something with result 
       slave.setIdle(True) 
      if slave.isIdle(): 
       slave.process(jobs.pop()) 
       slave.setIdle(False) 

EDIT 2 得到它,請參閱下面回答。

+1

你看着'multiprocessing.Pool'? http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers – tMC

+0

我不認爲有可能讓池進程退出並重用它們,是嗎? –

回答

0

看起來像使用SyncManager是最好的選擇了這種情況。

類碩士(SyncManager): 通

輸入= [1,4,5,6,9,6,6,3,3] DEF getNextInput(): #檢查輸入ISN」噸空第一 返回input.pop()

如果名稱 == 「 」: MAX_PROCESSES = 6 Master.register(「 getNextInput」,getNextInput) 米=主(( '本地主機' ,5000)) m.start() 爲我的range(MAX_PROCESSES): 從() 而真: 通

class Slave(Process): 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.start() 
    def run(self): 
     Master.register("getNextInput", getNextInput) 
     m = Master(('localhost', 5000)) 
     m.connect() 
     while True: 
      input = m.getNextInput() 
      # Check for None first 
      self.process(input) 
    def process(self): 
     print "Processed " + str(input) 
0

創建兩個隊列? 像worktodoAworktodoB,讓你的工人閒置,而他們等待某些東西被放入隊列中,如果放在那裏的東西讓我們說'退出',他們會退出?

否則,你應該給tMCs comment一個鏡頭

+0

這就是我現在正在做的事情。問題是我需要知道工人何時完成。 –

+0

另一個隊列呢?像「閒暇工作者」可能有點笨拙... – Willy

相關問題