2014-10-08 157 views
1

我在一個典型的「pub/sub」設置中有一組長時間運行的進程用於通信隊列。Python多處理 - 看門狗進程?

我希望做兩件事情,我無法弄清楚如何既同時完成:

  1. 添加/刪除工人。例如,如果我看到我的待處理隊列大小變得太大,我希望能夠添加額外的使用者。
  2. 我的流程的看門狗 - 我希望得到通知,如果我的任何生產者或消費者崩潰。

我可以在隔離做(2):

try: 
    while True: 
     for process in workers + consumers: 
      if not process.is_alive(): 
       logger.critical("%-8s%s died!", process.pid, process.name) 
     sleep(3) 
except KeyboardInterrupt: 
    # Python propagates CTRL+C to all workers, no need to terminate them 
    logger.warn('Received CTR+C, shutting down') 

上述塊,這防止了我這樣做(1)。

所以我決定將代碼移入自己的進程。

這不起作用,因爲process.is_alive()只適用於父母檢查其子女的狀態。在這種情況下,我想檢查的過程將是兄弟姐妹而不是孩子。

我對如何繼續有點難住。我的主進程如何支持對子進程的更改,同時還監視子進程?

+0

你不能直接做到這一點,至少不能用你可以稱之爲「有意義的可讀代碼」的方式。要管理它,您需要一個抽象級別,將作業分配給可以接收命令以向上/向下縮放的工作人員。坦率地說,編寫這個軟件是相當複雜的事情,並且有準備好的系統可以這樣做,看看芹菜。 – 2014-10-08 01:12:09

+0

@Puciek我在其他項目上使用過芹菜。它服務於不同的用例(AFAIK) - 啓動異步作業。我從來沒有聽說過用它來管理長期生產者和消費者。 – knite 2014-10-08 01:15:25

+0

您可以很好地使用它來啓動長時間運行的作業,包括使用者服務器 - 最後所有腳本的創建都是平等的,只要記得禁用超時即可。它配備了您似乎在尋找的自動縮放功能。 – 2014-10-08 01:17:23

回答

0

multiprocessing.Pool其實內置了看門狗。它運行一個線程,每隔0.1秒檢查一次工人是否死亡。如果有,它開始一個新的替代它的位置:

def _handle_workers(pool): 
    thread = threading.current_thread() 

    # Keep maintaining workers until the cache gets drained, unless the pool 
    # is terminated. 
    while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 
     pool._maintain_pool() 
     time.sleep(0.1) 
    # send sentinel to stop workers 
    pool._taskqueue.put(None) 
    debug('worker handler exiting') 

def _maintain_pool(self): 
    """Clean up any exited workers and start replacements for them. 
    """ 
    if self._join_exited_workers(): 
     self._repopulate_pool() 

這主要是用來實現maxtasksperchild關鍵字參數,實際上是在某些情況下出現問題。如果某個進程在mapapply命令正在運行時死亡,並且該進程處於處理與該調用相關聯的任務的中間,則該命令將永遠不會結束。有關該行爲的更多信息,請參閱this question。這就是說,如果你只是想知道一個進程已經死了,你可以創建一個監視池中所有進程的pid的線程(而不是一個進程),並且如果列表中的pid永遠不會變化,你知道一個進程已崩潰:

def monitor_pids(pool): 
    pids = [p.pid for p in pool._pool] 
    while True: 
     new_pids = [p.pid for p in pool._pool] 
     if new_pids != pids: 
      print("A worker died") 
      pids = new_pids 
     time.sleep(3) 

編輯:

如果你滾你自己Pool實現,你可以只取multiprocessing.Pool的提示,並在運行監管碼父進程中的後臺線程。檢查進程是否仍在運行的檢查很快,所以丟失到GIL後臺線程的時間應該可以忽略不計。考慮到multiprocessing.Process看門狗每0.1秒運行一次!每3秒運行一次就不會造成任何問題。

+0

我沒有使用游泳池,因爲我的製作人沒有做同樣的工作。但我會看看源代碼,看看我能否借用它的一些功能 - 看起來有點棘手,但是... – knite 2014-10-08 01:18:00

+0

@knite那麼,你仍然可以使用一個'Pool'來實現它。並不是'multiprocessing.Pool'中的每個進程都需要完成相同的工作。不過,使用'multiprocessing.Pool'獲得自動縮放比較麻煩。 – dano 2014-10-08 01:22:13