2013-05-14 84 views
5

異步子我有產卵與超時異步子流程在Python 3與超時

我想達到什麼問題:我想產卵多個進程異步無需等待結果,但我想也可以放心,每個產生的進程將在給定的超時內結束。

我在這裏發現了類似的問題:Using module 'subprocess' with timeoutAsynchronous background processes in Python?但他們沒有解決我的問題。

我的代碼看起來像這樣。我有一個命令類的建議在Using module 'subprocess' with timeout

class Command(object): 
    def __init__(self, cmd): 
    self.cmd = cmd 
    self.process = None 

    def run(self, timeout): 
    def target(): 
     print('Thread started') 
     args = shlex.split(self.cmd) 
     self.process = subprocess.Popen(args, shell=True) 
     self.process.communicate() 
     print('Thread finished') 

    thread = threading.Thread(target=target) 
    thread.start() 

    thread.join(timeout) 
    if thread.is_alive(): 
     print('Terminating process') 
     self.process.terminate() 
     thread.join() 

,然後當我想產卵子流程:當我運行此輸出似乎等待每個命令產卵和結束

for system in systems: 
    for service in to_spawn_system_info: 
    command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir, 
     service, system, service_log_dir) 
    command = Command(command_str) 
    command.run(timeout=60) 

。我得到

Thread started 
Thread finished 
Thread started 
Thread finished 
Thread started 
Thread finished 
Thread started 
Thread finished 

所以我的問題是我做錯了什麼?現在我開始懷疑是否有可能產生一個進程並通過超時限制它的執行。

爲什麼我需要這個? spawner腳本將在cron中運行。它將每隔10分鐘執行一次,它必須產生大約20個子進程。我想保證每個子進程都會在腳本從cron重新運行之前結束。

回答

3

如前所述,對process.communicate()的調用使您的代碼等待子進程的完成。但是,如果你只是刪除了communications()調用,那麼線程在產生進程後會立即退出,導致你的線程。join()調用過早退出,你會過早地關閉子進程。要做到你想要什麼,而不輪詢或忙等待,您可以設置超時後,計時器,將終止該進程(和亞軍線程)如果進程尚未完成:

class Command(object): 
    def __init__(self, cmd): 
    self.cmd = cmd 
    self.process = None 

    def run(self, timeout): 
    def target(): 
     print('Thread started') 
     # May want/need to skip the shlex.split() when using shell=True 
     # See Popen() constructor docs on 'shell' argument for more detail. 
     args = shlex.split(self.cmd) 
     self.process = subprocess.Popen(args, shell=True) 
     self.timer.start() 
     self.process.wait() 
     self.timer.cancel() 

    def timer_callback(): 
     print('Terminating process (timed out)') 
     self.process.terminate() 

    thread = threading.Thread(target=target) 
    self.timer = threading.Timer(timeout, timer_callback) 
    thread.start() 
+0

當我嘗試這個解決方案時,它不會在超時後終止我的線程。我將超時設置爲1秒,並在目標函數中添加time.sleep(1)。沒有線程被終止。 – sebast26 2013-05-14 13:20:11

+0

Hrmm。當target()退出時,線程應該終止。請記住,如上所述,如果過程正常退出而不超時,則不會得到打印輸出。我會仔細觀察一下,我可能忽略了一些東西。 – mshildt 2013-05-14 13:29:15

+0

因此,如果線程在子進程完成之前終止,子進程將終止?這與unutbu說的相反。他說,「那麼,即使你的線程完成後,你產生的每個子進程都將繼續存在」。我還有一個印象,就是子進程會繼續下去。 – b10hazard 2013-05-14 13:36:29

0
from threading import * 
from time import time 
import shlex 
import subprocess 
from random import randint 
class Worker(Thread): 
    def __init__(self, param, cmd, timeout=10): 
     self.cmd = cmd 
     self.timeout = timeout 

     Thread.__init__(self) 
     self.name = param 
    def run(self): 
     startup = time() 
     print(self.name + ' is starting') 

     args = shlex.split(self.cmd) 
     #Shell should be false when given a list (True for strings) 
     process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) 

     while time()-startup <= self.timeout: 
      if process.poll() != None: 
       break 

     process.stdout.close() 
     process.stdin.close() 
     process.stderr.close() 

     print(self.name + ' is dead') 

for i in range(0, 100): 
    x = Worker('Name-'+str(i), 'ping -n ' + str(randint(0,5)) + ' www.google.se') 
    x.start() 

while len(enumerate()) > 1: 
    pass # Wait for the threads to die 

這可以簡化您的工作方法嗎? 特別是考慮到你不需要等待一個結果,這隻會啓動一個類對象進入外層空間,執行超時工作。

還要注意:

  • 不關閉標準輸出,標準輸入和標準錯誤會導致「對於許多文件句柄開放」幾乎所有的系統
  • 作爲另一個答案中指出,.communicate()等待處理退出(使用.poll()代替)
1

使用相互獨立開始和結束的線程。如果您知道所有要提前運行的命令,此方法將非常有用。下面是一個例子...

from threading import Thread 
import subprocess 
import Queue 
import multiprocessing 

class Command(object): 
    def __init__(self, cmds): 
     self.cmds = cmds 

    def run_cmds(self): 
     cmd_queue = Queue.Queue() 
     for cmd in self.cmds: 
      cmd_queue.put(cmd) 

     available_threads = multiprocessing.cpu_count() 
     for x in range(0,available_threads): 
      t = Thread(target=self.run_cmd,args=(cmd_queue,)) 
      t.setDaemon(True) 
      t.start() 

     cmd_queue.join() 


    def run_cmd(self, cmd_queue): 
     while True: 
      try: cmd = cmd_queue.get() 
      except: break 
      print 'Thread started' 
      process = subprocess.Popen(cmd, shell=True) 
      process.communicate() 
      print 'Thread finished' 
      cmd_queue.task_done() 


# create list of commands you want to run 
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop'] 
# create class 
c = Command(cmds) 
# run them... 
c.run_cmds() 

這將打印....

Thread started 
Thread started 
Thread started 
Thread startedThread finished 

Thread started 
Thread finishedThread finished 

Thread finished 
Thread finished 

正如你可以從輸出的子過程開始看到彼此獨立的結束,並沒有子等待另一個子進程完成,因爲它們都在不同的線程中調用。當然,你可以添加超時時間和其他任何你想要的,這只是一個簡單的例子。這假定你知道你想要運行的所有命令。如果你想添加線程超時,請參閱epicbrews答案。如果你願意的話,你可以把他的線程超時例子加入到這個例子中。

+0

正如我在我的例子? :P Altho我沒有把它描述得像你一樣乾淨。 – Torxed 2013-05-14 12:17:46

+0

實際上,當我編寫我的答案時,您的示例中有process.communicate()。否則我不會回答。我在編輯歷史記錄中看到您刪除了它。 – b10hazard 2013-05-14 12:30:55

+0

Yepp,但我儘可能早地刪除了它,因爲我之前只是在他的代碼中粘貼了他的代碼,以便在我的連接斷開之前(在火車上,所以它的DC每2分鐘):) – Torxed 2013-05-14 12:48:27