我想創建一個類,比可以運行一個單獨的過程去做一些需要很長時間的工作,從主模塊啓動一堆這些,然後等待它們完成。我希望一次啓動這些流程,然後繼續爲他們提供要做的事情,而不是創建和銷燬流程。例如,也許我有10臺服務器運行dd命令,然後我希望它們都能夠scp文件等。如何在Python中對類實例使用多處理?
我的最終目標是爲每個系統創建一個類,以跟蹤系統的信息它與IP地址,日誌,運行時等綁定在一起。但該類必須能夠啓動系統命令,然後在該系統命令運行時將執行返回給調用方,以便稍後使用系統命令的結果進行後續處理。
我的嘗試失敗了,因爲我無法通過pickle將管道上的類的實例方法發送到子進程。那些不是可以醃製的。因此,我試圖以各種方式解決它,但我無法弄清楚。我的代碼如何補丁來做到這一點?如果你不能發送任何有用的信息,多處理有什麼好處?
是否有任何好的與類實例一起使用的多處理文檔?我可以讓多處理模塊工作的唯一方法是使用簡單的函數。每個在類實例中使用它的嘗試都失敗了。也許我應該通過事件呢?我不明白該怎麼做。
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
編輯:我試着移動ProcessWorker
類和Worker
類之外創建多隊列,然後試圖手動酸洗工人實例。即使這不起作用,我得到一個錯誤
RuntimeError: Queue objects should only be shared between processes through inheritance
。但我只是將這些隊列的引用傳遞給worker實例?我錯過了一些基本的東西。下面是主要的部分修改後的代碼:
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
你見過['dispy'](http://dispy.sourceforge.net/)嗎?它可能會節省頭痛或兩個:) –
我找不到任何使用類的dispy的例子。一切似乎從__main__運行,這不是我打算如何使用它。我使用multiprocessing.Process的例子在__main__中運行良好,但是當我嘗試使用狀態爲 –
的類和方法時失敗了。我知道這在遊戲中很晚,但如果使用名爲'pathos.multiprocessing'的multiprocessing分支,可以輕鬆地醃製類實例。如果你需要使用'Queue'對象和其他東西,那麼你可以通過導入'from processing import Queue'來訪問增加的分叉'Queues'。 'pathos.multiprocessing'使用'dill',**做**序列化和發送類定義以及實例。 –