2011-11-06 43 views
6

好了,因爲目前還沒有答案的,我不覺得太糟糕了這樣做。 雖然我在什麼是真正幕後發生的事情仍有意造成這個問題,我的最緊迫的問題是那些在更新2.指定的那些之中,子流程完成,但仍然沒有終止,造成僵局

什麼是JoinableQueueManager().Queue()之間的差異(和你應該什麼時候使用一個呢?)。重要的是,在這個例子中,替換另一個是否安全?


在下面的代碼中,我有一個簡單的進程池。每個進程通過進程隊列(pq)拉從處理的數據,並且返回值隊列(rq)來傳遞的處理的返回值返回到主線程。如果我不追加到返回值隊列中,它會起作用,但是一旦我這樣做了,出於某種原因阻止了進程停止。在這兩種情況下的過程run方法返回,所以它不是put在返回隊列阻塞,但在第二種情況下,過程本身不終止,所以程序死鎖當我join上的進程。爲什麼會這樣?

更新:

  1. 它似乎有東西在隊列中的項目數。

    在我的機器至少,我可以在隊列高達6570個項目和它的實際工作,但比這更多,它死鎖。

  2. 它似乎與Manager().Queue()一起使用。

    無論它是JoinableQueue限制或只是我誤解了兩個對象之間的差異,我發現,如果我有一個Manager().Queue()更換回隊列,它將按預期工作。它們之間有什麼區別,你應該什麼時候使用它們?

  3. 如果我從rq

    OOP消費不會發生錯誤。這裏有一個答案,當我評論它時,它消失了。無論如何,它所說的一件事是質疑,如果我添加一個消費者,這個錯誤仍然會發生。我已經嘗試過了,答案是,沒有。

    它提到的另一件事是從the multiprocessing docs這句話作爲一個可能的關鍵問題。參照JoinableQueue的,它說:

    ...用來計數的未完成任務數的旗語可 最終溢出引發異常。


import multiprocessing 

class _ProcSTOP: 
    pass 

class Proc(multiprocessing.Process): 

    def __init__(self, pq, rq): 
     self._pq = pq 
     self._rq = rq 
     super().__init__() 
     print('++', self.name) 

    def run(self): 
     dat = self._pq.get() 

     while not dat is _ProcSTOP: 
#   self._rq.put(dat)  # uncomment me for deadlock 
      self._pq.task_done() 
      dat = self._pq.get() 

     self._pq.task_done() 
     print('==', self.name) 

    def __del__(self): 
     print('--', self.name) 

if __name__ == '__main__': 

    pq = multiprocessing.JoinableQueue() 
    rq = multiprocessing.JoinableQueue() 
    pool = [] 

    for i in range(4): 
     p = Proc(pq, rq) 
     p.start() 
     pool.append(p) 

    for i in range(10000): 
     pq.put(i) 

    pq.join() 

    for i in range(4): 
     pq.put(_ProcSTOP) 

    pq.join() 

    while len(pool) > 0: 
     print('??', pool) 
     pool.pop().join() # hangs here (if using rq) 

    print('** complete') 

示例輸出,不使用回隊列:

++ Proc-1 
++ Proc-2 
++ Proc-3 
++ Proc-4 
== Proc-4 
== Proc-3 
== Proc-1 
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>] 
== Proc-2 
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>] 
-- Proc-3 
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>] 
-- Proc-2 
?? [<Proc(Proc-1, stopped)>] 
-- Proc-1 
** complete 
-- Proc-4 

示例輸出,使用返回隊列:

++ Proc-1 
++ Proc-2 
++ Proc-3 
++ Proc-4 
== Proc-2 
== Proc-4 
== Proc-1 
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>] 
== Proc-3 
# here it hangs 
+0

可能是相關的:http://bugs.python.org/issue8237 – jfs

+0

@ J.F.Sebastian。這可能是,但似乎是說,它阻止了'put',所有'run'的返回,並且'put'只在'run'內出現,所以我的'put'不能被阻止。 – tjm

回答

0

documentation

警告

正如上面提到的,如果一個子進程已經把項目一個隊列(它沒有使用JoinableQueue.cancel_join_thread()),那麼該過程將不會終止,直到所有緩衝項目已被刷新到管道。

這意味着如果您嘗試加入該進程,則可能會發生死鎖,除非您確定已放入隊列的所有項目都已被使用。同樣,如果子進程是非守護進程,那麼父進程在嘗試加入所有非守護進程子進程時可能會在退出時掛起。

請注意,使用管理器創建的隊列不存在此問題。請參閱編程準則。

因此,JoinableQueue()使用一個管道,並將等待,直到它可以在關閉之前刷新所有數據。

另一方面,Manager.Queue()對象使用完全不同的方法。 管理員正在運行一個單獨的進程,可以立即接收所有數據(並將其存儲在內存中)。

經理提供了一種方法來創建可以在不同進程之間共享的數據。管理員對象控制管理共享對象的服務器進程。其他進程可以通過使用代理來訪問共享對象。

...

隊列([MAXSIZE]) 創建共享Queue.Queue對象,並返回一個代理它。