2016-08-24 38 views
0

我使用python multiprocessing來分叉一些子進程來運行我的作業。有兩個要求:如何區分Multiprocessing.Pool中的進程?

  1. 我需要知道孩子進程的pid,以防萬一我想要殺死它。
  2. 我需要回調在作業完成後做一些東西。因爲這些東西在父進程中使用鎖,所以它不能在子進程中完成。

,但我得到:

  1. 過程中產生的by multiprocessing.Process()有一個屬性 「PID」 來獲得它的PID。但是我不能添加異步回調,當然我也不能等待同步。
  2. multiprocessing.Pool()生成的進程池提供了回調接口。但我無法分辨池中的哪個進程與我的工作相匹配,因爲我可能需要根據特定的工作來殺死進程。

任務是便宜,這裏顯示了代碼:

import random, time 
import multiprocessing 
import os 

class Job(object): 
    def __init__(self, jobid, jobname, command): 
     self.jobid, self.jobname, self.command = jobid, jobname, command 

    def __str__(self): 
     return "Job <{0:05d}>".format(self.jobid) 

    def __repr__(self): 
     return self.__str__() 

def _run_job(job): 
    time.sleep(1) 
    print "{} done".format(job) 
    return job, random.choice([True, False]) # the second argument indicates whether job has finished successfully 

class Test(object): 
    def __init__(self): 
     self._loc = multiprocessing.Lock() 
     self._process_pool = multiprocessing.Pool() 

    def submit_job(self, job): 
     with self._loc: 
      self._process_pool.apply_async(_run_job, (job,), callback=self.job_done) 
      print "submitting {} successfully".format(job) 

    def job_done(self, result): 
     with self._loc: 
      # stuffs after job has finished is related to some cleanning work, so it needs the lock of the parent process 
      job, success = result 
      if success: 
       print "{} success".format(job) 
      else: 
       print "{} failure".format(job) 


j1 = Job(1, "test1", "command1") 
j2 = Job(2, "test2", "command2") 
t = Test() 
t.submit_job(j1) 
t.submit_job(j2) 
time.sleep(3.1) # wait for all jobs finishing 

但現在我無法得到對應於每個作業的PID。例如,我需要終止作業< 1>,但是我找不到進程池中的哪個進程與作業< 1>相關,所以我可能無法在任何時候終止該作業。

如果我使用multiprocessing.Process或者,我可以記錄每個進程的PID與其相應的jobid。但我現在不能添加回調方法。

那麼有沒有一種方法來獲得子進程的PID和添加回調方法?

回答

0

最後我找到了一個解決方案:用multiprocessing.Event代替。

由於multiprocessing.Pool不能告訴我哪個進程被分配,所以我無法記錄它,以便我可以在任何時候根據作業id來殺死它。

幸運的是,multiprocessing提供了Event對象作爲回調方法的替代方法。回想一下回調方法的作用:它提供對子進程的異步響應。一旦子進程完成,父進程可以檢測到它並調用回調方法。所以核心問題是父進程如何檢測子進程是否已完成。那是Event對象。

所以解決方案很簡單:將一個Event對象傳遞給子進程。一旦子進程完成,它將設置Event對象。在父進程中,它啓動一個守護進程線程來監視事件是否被設置。如果是這樣,它可以調用那些回調函數的方法。而且,由於我用multiprocessing.Process而不是multiprocessing.Pool創建了進程,我可以很容易地獲得它的PID,這使我能夠殺死它。

解決方案代碼:

import time 
import multiprocessing 
import threading 

class Job(object): 
    def __init__(self, jobid, jobname, command): 
     self.jobid, self.jobname, self.command = jobid, jobname, command 
     self.lifetime = 0 

    def __str__(self): 
     return "Job <{0:05d}>".format(self.jobid) 

    def __repr__(self): 
     return self.__str__() 

def _run_job(job, done_event): 
    time.sleep(1) 
    print "{} done".format(job) 
    done_event.set() 

class Test(object): 
    def __init__(self): 
     self._loc = multiprocessing.Lock() 
     self._process_pool = {} 
     t = threading.Thread(target=self.scan_jobs) 
     t.daemon = True 
     t.start() 

    def scan_jobs(self): 
     while True: 
      with self._loc: 
       done_jobid = [] 
       for jobid in self._process_pool: 
        process, event = self._process_pool[jobid] 
        if event.is_set(): 
         print "Job<{}> is done in process <{}>".format(jobid, process.pid) 
         done_jobid.append(jobid) 
       map(self._process_pool.pop, done_jobid) 
      time.sleep(1) 


    def submit_job(self, job): 
     with self._loc: 
      done_event = multiprocessing.Event() 
      new_process = multiprocessing.Process(target=_run_host_job, args=(job, done_event)) 
      new_process.daemon = True 
      self._process_pool[job.jobid] = (new_process, done_event) 
      new_process.start() 
      print "submitting {} successfully".format(job) 


j1 = Job(1, "test1", "command1") 
j2 = Job(2, "test2", "command2") 
t = Test() 
t.submit_job(j1) 
t.submit_job(j2) 
time.sleep(5) # wait for job to finish 
+0

有趣。我有一個類似的問題 - 爲什麼不使用進程列表? – comodoro

相關問題