2014-10-09 61 views
2

說我這樣做:Pool.map - 爲什麼工作進程不會提前崩潰?

import multiprocessing as mp 

def f(x): 
    raise OverflowError # raised BEFORE the print 
    print x 

if __name__ == '__main__': 
    pool = mp.Pool(processes=1) 
    for _ in pool.imap_unordered(f, range(10)): 
     pass 
    pool.close() 
    pool.join() 

日期:

Traceback (most recent call last): 
    File "test0.py", line 9, in <module> 
    for _ in pool.imap_unordered(f, range(10)): 
    File "/Users/usualme/anaconda/lib/python2.7/multiprocessing/pool.py", line 659, in next 
    raise value 
OverflowError 

好輸出是有道理的。在print聲明之前引發異常,所以沒有輸出。現在幾乎相同的代碼,但我換2號線:

import multiprocessing as mp 

def f(x): 
    print x 
    raise OverflowError # raised AFTER the print 

if __name__ == '__main__': 
    pool = mp.Pool(processes=1) 
    for _ in pool.imap_unordered(f, range(10)): 
     pass 
    pool.close() 
    pool.join() 

日期:

0 
1 
2 
3 
4 
5 
6 
7 
8 
9 
Traceback (most recent call last): 
    File "test0.py", line 9, in <module> 
    for _ in pool.imap_unordered(f, range(10)): 
    File "/Users/usualme/anaconda/lib/python2.7/multiprocessing/pool.py", line 659, in next 
    raise value 
OverflowError 

我不明白的輸出。我希望數字0跟着堆棧跟蹤,或者所有10個數字和10堆棧跟蹤。爲什麼它會打印所有的數字和只有一個堆棧跟蹤?爲什麼工作進程等待最終崩潰?

回答

2

這只是一個時機 - worker進程並不在意在運行的函數中引發異常,它只是將異常返回給父級,並且隨着下一個任務一起繼續執行。下面是它的運行循環(略簡體):

while maxtasks is None or (maxtasks and completed < maxtasks): 
    try: 
     task = get() # Get task from parent 
    except (EOFError, OSError): 
     util.debug('worker got EOFError or OSError -- exiting') 
     break 

    if task is None: 
     util.debug('worker got sentinel -- exiting') 
     break 

    job, i, func, args, kwds = task 
    try: 
     result = (True, func(*args, **kwds)) # Call the function pass from the parent 
    except Exception as e: # We end up in here if the worker raises an exception 
     if wrap_exception: 
      e = ExceptionWithTraceback(e, e.__traceback__) 
     result = (False, e) # The exception object is stored as the result 

    put((job, i, result)) # Send result to parent process 

所以,即使第一個任務將引發異常,它需要的時間一點點,結果兩個進程之間旅行,併爲父母過程實際上將結果拉出Queue並提高Exception。在那個時間窗口中,工作人員能夠執行所有剩餘的任務。如果你使職工功能更慢,你會看到它執行任務少:

import multiprocessing as mp 
import time 

def f(x): 
    print x 
    time.sleep(2) 
    raise OverflowError 

if __name__ == '__main__': 
    pool = mp.Pool(processes=1) 
    for _ in pool.imap_unordered(f, range(10)): 
     pass 
    pool.close() 
    pool.join() 

輸出:

0 
1 
Traceback (most recent call last): 
    File "p.py", line 11, in <module> 
    for _ in pool.imap_unordered(f, range(10)): 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 626, in next 
    raise value 
OverflowError 

也僅會看到,如果你通過一個更大的結果的一定百分比印刷可迭代的,因爲工人在父母死亡之前沒有足夠的時間來完成所有的工作。

您只會看到實際得到的一個異常,因爲從父母的角度來看,只要一個任務失敗,就應該中止整個imap調用。父進程從單個Queue開始依次從其所有子進程中獲取結果,因此一旦它看到第一個異常,imap調用即告結束,所以其餘任務的結果將被拋棄。