2013-01-05 36 views
21

我想創建一個類,比可以運行一個單獨的過程去做一些需要很長時間的工作,從主模塊啓動一堆這些,然後等待它們完成。我希望一次啓動這些流程,然後繼續爲他們提供要做的事情,而不是創建和銷燬流程。例如,也許我有10臺服務器運行dd命令,然後我希望它們都能夠scp文件等。如何在Python中對類實例使用多處理?

我的最終目標是爲每個系統創建一個類,以跟蹤系統的信息它與IP地址,日誌,運行時等綁定在一起。但該類必須能夠啓動系統命令,然後在該系統命令運行時將執行返回給調用方,以便稍後使用系統命令的結果進行後續處理。

我的嘗試失敗了,因爲我無法通過pickle將管道上的類的實例方法發送到子進程。那些不是可以醃製的。因此,我試圖以各種方式解決它,但我無法弄清楚。我的代碼如何補丁來做到這一點?如果你不能發送任何有用的信息,多處理有什麼好處?

是否有任何好的與類實例一起使用的多處理文檔?我可以讓多處理模塊工作的唯一方法是使用簡單的函數。每個在類實例中使用它的嘗試都失敗了。也許我應該通過事件呢?我不明白該怎麼做。

import multiprocessing 
import sys 
import re 

class ProcessWorker(multiprocessing.Process): 
    """ 
    This class runs as a separate process to execute worker's commands in parallel 
    Once launched, it remains running, monitoring the task queue, until "None" is sent 
    """ 

    def __init__(self, task_q, result_q): 
     multiprocessing.Process.__init__(self) 
     self.task_q = task_q 
     self.result_q = result_q 
     return 

    def run(self): 
     """ 
     Overloaded function provided by multiprocessing.Process. Called upon start() signal 
     """ 
     proc_name = self.name 
     print '%s: Launched' % (proc_name) 
     while True: 
      next_task_list = self.task_q.get() 
      if next_task is None: 
       # Poison pill means shutdown 
       print '%s: Exiting' % (proc_name) 
       self.task_q.task_done() 
       break 
      next_task = next_task_list[0] 
      print '%s: %s' % (proc_name, next_task) 
      args = next_task_list[1] 
      kwargs = next_task_list[2] 
      answer = next_task(*args, **kwargs) 
      self.task_q.task_done() 
      self.result_q.put(answer) 
     return 
# End of ProcessWorker class 

class Worker(object): 
    """ 
    Launches a child process to run commands from derived classes in separate processes, 
    which sit and listen for something to do 
    This base class is called by each derived worker 
    """ 
    def __init__(self, config, index=None): 
     self.config = config 
     self.index = index 

     # Launce the ProcessWorker for anything that has an index value 
     if self.index is not None: 
      self.task_q = multiprocessing.JoinableQueue() 
      self.result_q = multiprocessing.Queue() 

      self.process_worker = ProcessWorker(self.task_q, self.result_q) 
      self.process_worker.start() 
      print "Got here" 
      # Process should be running and listening for functions to execute 
     return 

    def enqueue_process(target): # No self, since it is a decorator 
     """ 
     Used to place an command target from this class object into the task_q 
     NOTE: Any function decorated with this must use fetch_results() to get the 
     target task's result value 
     """ 
     def wrapper(self, *args, **kwargs): 
      self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled! 
     return wrapper 

    def fetch_results(self): 
     """ 
     After all processes have been spawned by multiple modules, this command 
     is called on each one to retreive the results of the call. 
     This blocks until the execution of the item in the queue is complete 
     """ 
     self.task_q.join()       # Wait for it to to finish 
     return self.result_q.get()     # Return the result 

    @enqueue_process 
    def run_long_command(self, command): 
     print "I am running number % as process "%number, self.name 

     # In here, I will launch a subprocess to run a long-running system command 
     # p = Popen(command), etc 
     # p.wait(), etc 
     return 

    def close(self): 
     self.task_q.put(None) 
     self.task_q.join() 

if __name__ == '__main__': 
    config = ["some value", "something else"] 
    index = 7 
    workers = [] 
    for i in range(5): 
     worker = Worker(config, index) 
     worker.run_long_command("ls /") 
     workers.append(worker) 
    for worker in workers: 
     worker.fetch_results() 

    # Do more work... (this would actually be done in a distributor in another class) 

    for worker in workers: 
     worker.close() 

編輯:我試着移動ProcessWorker類和Worker類之外創建多隊列,然後試圖手動酸洗工人實例。即使這不起作用,我得到一個錯誤

RuntimeError: Queue objects should only be shared between processes through inheritance

。但我只是將這些隊列的引用傳遞給worker實例?我錯過了一些基本的東西。下面是主要的部分修改後的代碼:

if __name__ == '__main__': 
    config = ["some value", "something else"] 
    index = 7 
    workers = [] 
    for i in range(1): 
     task_q = multiprocessing.JoinableQueue() 
     result_q = multiprocessing.Queue() 
     process_worker = ProcessWorker(task_q, result_q) 
     worker = Worker(config, index, process_worker, task_q, result_q) 
     something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues?? 
     process_worker.start() 
     worker.run_long_command("ls /") 
+0

你見過['dispy'](http://dispy.sourceforge.net/)嗎?它可能會節省頭痛或兩個:) –

+2

我找不到任何使用類的dispy的例子。一切似乎從__main__運行,這不是我打算如何使用它。我使用multiprocessing.Process的例子在__main__中運行良好,但是當我嘗試使用狀態爲 –

+0

的類和方法時失敗了。我知道這在遊戲中很晚,但如果使用名爲'pathos.multiprocessing'的multiprocessing分支,可以輕鬆地醃製類實例。如果你需要使用'Queue'對象和其他東西,那麼你可以通過導入'from processing import Queue'來訪問增加的分叉'Queues'。 'pathos.multiprocessing'使用'dill',**做**序列化和發送類定義以及實例。 –

回答

8

而是試圖發送一個方法本身(這是不切實際的),嘗試發送一個名稱來執行方法的的。

假設每個員工運行相同的代碼,這是一個簡單的問題getattr(self, task_name)

我會通過元組(task_name, task_args),其中task_args是一個字典被直接輸送到任務的方法:

next_task_name, next_task_args = self.task_q.get() 
if next_task_name: 
    task = getattr(self, next_task_name) 
    answer = task(**next_task_args) 
    ... 
else: 
    # poison pill, shut down 
    break 
+1

這不起作用......我得到錯誤「AttributeError:'ProcessWorker'對象沒有屬性'run_long_command'」。我不希望這樣做,因爲ProcessWorker沒有Worker類中存在的方法。我想通過管道發送方法(包含狀態信息),以便遠程進程可以使用所有狀態信息。我真的沒有看到多進程模塊的重點,如果它只是在另一端運行無狀態函數。 –

+2

對不起,但我必須重複。您不能通過管道發送方法。這就是爲什麼'鹹菜'抱怨的原因。發送可執行代碼並不是不可能的,但它更多地涉及到僅僅反序列化代碼對象。你應該事先實現你想在Worker類中運行的方法。如果你確實需要發送未知的代碼,最好的方法是將Python源代碼作爲字符串發送,然後調用它的'compile'和'eval'。如果要發送帶有狀態的方法,請將所有狀態置於方法的參數中,或使用共享數據庫。 – 9000

+0

WRT運行無狀態方法:您有可以保持狀態的管道。您將初始狀態標記爲多個進程,然後收集結果。如果您想要高度共享的狀態(例如光線追蹤的幾何體),您可以使用(內存中)數據庫,任何從memcached到常規RDBMS的數據庫。使用全局_mutable_狀態通常是一個足夠糟糕的想法。如果您必須使用從管道讀取並解決衝突的仲裁進程(例如數據庫)。 – 9000

21

那麼,問題是,我是假設的Python在做某種魔力,與C++/fork()的工作方式有所不同。我以某種方式認爲Python只複製了類,而不是將整個程序複製到單獨的進程中。因爲所有關於pickle序列化的討論都讓我覺得它實際上已經把所有東西都發送到管道上了,所以我非常浪費時間試圖讓它工作。我知道某些東西不能通過管道傳送,但我認爲我的問題是我沒有正確包裝東西。

如果Python文檔給我一個10,000英尺的視圖,說明使用這個模塊會發生什麼,這一切都可以避免。當然,它告訴我多進程模塊的方法是什麼,給了我一些基本的例子,但是我想知道的是幕後的「操作理論」是什麼!這是我可以使用的那種信息。如果我的答案是關閉的,請留言。它會幫助我學習。

當您使用此模塊運行啓動進程時,整個程序將被複制到另一個進程中。但由於它不是「__main__」進程,而且我的代碼正在檢查這個進程,所以它無法啓動另一個進程。它只是停下來坐在那裏等着做什麼,像殭屍。在調用multiprocess.Process()的時候,在父進程中初始化的所有東西都已經設置完畢並準備就緒。一旦你在multiprocess.Queue或者共享內存,或者管道等等(不管你是在通信)中放置什麼東西,那麼單獨的進程就會接收它並開始工作。它可以利用所有導入的模塊並設置,就像它是父項一樣。但是,一旦內部狀態變量在父進程或獨立進程中發生變化,這些更改就會被隔離。一旦這個過程產生了,現在就成爲你的工作,如果有必要,通過隊列,管道,共享內存等使它們保持同步。

我拋出代碼並重新開始,但現在我只是把在ProcessWorker中有一個額外的功能,一個運行命令行的「執行」方法。很簡單。我不必擔心以這種方式啓動並關閉一堆進程,這在過去使用C++導致了各種不穩定和性能問題。當我開始在開始時啓動進程,然後將消息傳遞給這些等待進程時,我的性能得到了改善,並且非常穩定。

BTW,我看着這個鏈接以獲得幫助,扔我,因爲例子讓我覺得方法正在整個隊列運輸:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html 使用的第一部分的第二個例子「next_task()」說(對我來說)執行通過隊列接收到的任務。

+1

正如我在你的問題的評論中指出的那樣,如果你想醃製一個無需擔心依賴關係的類實例,你應該使用'dill',它既可以醃製一個類使用類實例進行定義,*或*醃製大多數對象(包括用戶定義的類)的源代碼和依賴項。 'multiprocessing'的分支(在問題的評論中提到)使用'dill'進行序列化......從而避免了大部分你所描述的問題。 –

0

REF:當他說他被 http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html誤導在1月6日https://stackoverflow.com/a/14179779

答案在6:03由大衛·林奇不屬實。

所提供的代碼和示例是正確的,並按廣告所述工作。 next_task()執行通過隊列接收到的任務 - 嘗試並瞭解Task.__call__()方法正在做什麼。

在我的情況下,絆倒了我是在執行run()的語法錯誤。看來子進程不會報告這個,只是默默地失敗 - 讓事情陷入奇怪的循環中!確保你有某種類型的語法檢查器運行Flymake/Pyflakes在Emacs中。

通過multiprocessing.log_to_stderr()調試F幫助我縮小了問題範圍。