2010-06-25 77 views
6

我在使用這段代碼的死鎖問題:的Python multiprocessing.Queue死鎖,並得到


def _entropy_split_parallel(data_train, answers_train, weights): 
    CPUS = 1 #multiprocessing.cpu_count() 
    NUMBER_TASKS = len(data_train[0]) 
    processes = [] 

    multi_list = zip(data_train, answers_train, weights) 

    task_queue = multiprocessing.Queue() 
    done_queue = multiprocessing.Queue() 

    for feature_index in xrange(NUMBER_TASKS): 
     task_queue.put(feature_index) 

    for i in xrange(CPUS): 
     process = multiprocessing.Process(target=_worker, 
       args=(multi_list, task_queue, done_queue)) 
     processes.append(process) 
     process.start() 

    min_entropy = None 
    best_feature = None 
    best_split = None 
    for i in xrange(NUMBER_TASKS): 
     entropy, feature, split = done_queue.get() 
     if (entropy < min_entropy or min_entropy == None) and entropy != None: 
      best_feature = feature 
      best_split = split 

    for i in xrange(CPUS): 
     task_queue.put('STOP') 

    for process in processes: 
     process.join() 

    return best_feature, best_split 


def _worker(multi_list, task_queue, done_queue): 
    feature_index = task_queue.get() 
    while feature_index != 'STOP': 
     result = _entropy_split3(multi_list, feature_index) 
     done_queue.put(result) 
     feature_index = task_queue.get() 

當我運行我的程序,它通過_entropy_split_parallel工作正常多次運行,但最終死鎖。父進程在done_queue.get()上被阻止,並且工作進程在done_queue.put()上被阻止。由於在發生這種情況時隊列總是空的,因此預計在get上阻塞。我不明白的是爲什麼工人在put上阻塞,因爲隊列顯然沒有滿(這是空的!)。我試過blocktimeout關鍵字參數,但得到相同的結果。

我正在使用多處理backport,因爲我被困在Python 2.5中。


編輯:它看起來像我也越來越與多處理模塊提供的示例之一的死鎖問題。這是底部的第三個例子here.如果我多次調用測試方法,死鎖似乎只會發生。例如,更改腳本的底部,這樣的:


if __name__ == '__main__': 
    freeze_support() 
    for x in xrange(1000): 
     test() 

編輯:我知道這是一個老問題,但測試表明,這已不再是與Python 2.7 Windows中的一個問題。我會嘗試Linux並回報。

回答

0

這個問題與較新版本的Python不兼容,所以我假設它是一個backport的問題。無論如何,這不再是一個問題。

4

我認爲問題是父線程加入它已經通過隊列的子線程。這是討論多處理模塊的programming guidelines section

無論如何,我遇到了與您描述的相同的症狀,並且在重構我的邏輯以便主線程未加入子線程時,沒有發生死鎖。我的重構邏輯涉及知道我應該從結果或「完成」隊列中獲得的項目數量(可以根據子線程數量或工作隊列上的項目數量等進行預測),以及循環無限地收集所有這些信息。邏輯

「玩具」插圖:

num_items_expected = figure_it_out(work_queue, num_threads) 
items_received = [] 
while len(items_received) < num_items_expected: 
    items_received.append(done_queue.get()) 
    time.sleep(5) 

上述邏輯避免了父線程加入子線程的需要,但允許父線程阻塞,直到所有的孩子都做了。這種方法避免了我的死鎖問題。

+0

我認爲所有隊列在進程加入時應該是空的,所以這應該不成問題。另外,主進程在put上死鎖,而不是加入。我剛剛升級了Python(我被一箇舊版本卡住了),所以我會再次測試這個。 – ajduff574 2010-08-26 06:22:33

+0

@ajduff在我的情況下,死鎖沒有發生在連接上,但是放置也是如此,除了放在子線程中。另外,就我而言,正在輸入的隊列是空的。所以我認爲這也值得一試(也就是避免主線程加入子線程)。 – Jeet 2010-08-26 10:01:58