2012-12-30 45 views
3

我最近張貼的問題Using multiprocessing for finding network paths,並高興地已主動提出@unutbupython多處理掛起,潛在的隊列內存錯誤?

很好地解決執行test_workers()(利用多處理)功能時,我卻已經陷入困境。該代碼運行,但掛起了大量的節點N在我的網絡中G

運行使用Mac OS X Lion 10.7.5 - python 2.7時,它掛起時,N> 500。測井帶來下述信息,之後將其掛起

[DEBUG/MainProcess] doing self._thread.start() 
[DEBUG/MainProcess] starting thread to feed data to pipe 
[DEBUG/MainProcess] ... done self._thread.start() 

運行在通過VMware融合視窗7有利於較大的網絡,但最終與圍繞其中N> 20000個節點(I將理想地要使用此網絡上向上圖表掛起到N = 500,000)。來自窗邊懸掛點的消息:

[DEBUG/MainProcess] starting thread to feed data to pipe 
[DEBUG/MainProcess] ... done self._thread.start()[DEBUG/MainProcess] telling queue thread to quit 
Traceback (most recent call last): 
     File "C:\Users\Scott\Desktop\fp_test.py", line 75, in <module> 
    Traceback (most recent call last): 
      File "C:\Python27\lib\multiprocessing\queues.py", line 264, in _feed 
    test_workers() 
    MemoryError 

我想知道是否有人對此有何看法?並且如果有任何關於如何使這項工作適用於大型網絡的建議?

非常感謝您的任何建議,您可能有。

@ unutbu代碼:

import networkx as nx 
import multiprocessing as mp 
import random 
import sys 
import itertools as IT 
import logging 
logger = mp.log_to_stderr(logging.DEBUG) 


def worker(inqueue, output): 
    result = [] 
    count = 0 
    for pair in iter(inqueue.get, sentinel): 
     source, target = pair 
     for path in nx.all_simple_paths(G, source = source, target = target, 
             cutoff = None): 
      result.append(path) 
      count += 1 
      if count % 10 == 0: 
       logger.info('{c}'.format(c = count)) 
    output.put(result) 

def test_workers(): 
    result = [] 
    inqueue = mp.Queue() 
    for source, target in IT.product(sources, targets): 
     inqueue.put((source, target)) 
    procs = [mp.Process(target = worker, args = (inqueue, output)) 
      for i in range(mp.cpu_count())] 
    for proc in procs: 
     proc.daemon = True 
     proc.start() 
    for proc in procs:  
     inqueue.put(sentinel) 
    for proc in procs: 
     result.extend(output.get()) 
    for proc in procs: 
     proc.join() 
    return result 

def test_single_worker(): 
    result = [] 
    count = 0 
    for source, target in IT.product(sources, targets): 
     for path in nx.all_simple_paths(G, source = source, target = target, 
             cutoff = None): 
      result.append(path) 
      count += 1 
      if count % 10 == 0: 
       logger.info('{c}'.format(c = count)) 

    return result 

sentinel = None 

seed = 1 
m = 1 
N = 1340//m 
G = nx.gnm_random_graph(N, int(1.7*N), seed) 
random.seed(seed) 
sources = [random.randrange(N) for i in range(340//m)] 
targets = [random.randrange(N) for i in range(1000//m)] 
output = mp.Queue() 

if __name__ == '__main__': 
    test_workers() 
    # test_single_worker() 
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker())) 

回答

2

您在死鎖附帶的logging模塊。

該模塊保留了一些線程鎖以允許跨線程進行安全日誌記錄,但是在當前進程分叉時它不能很好地運行。例如,請參閱here以瞭解正在發生的情況。

解決方法是取消logging調用或使用普通的print

無論如何,作爲一般規則,避免使用線程+分叉。並且始終檢查哪些模塊在幕後使用線程。

請注意,在Windows上,它的工作原理很簡單,因爲windows沒有fork,因此沒有鎖定克隆隨後死鎖的問題。 在這種情況下,MemoryError表明該進程消耗的RAM太多。 您可能不得不重新考慮使用較少RAM的算法,但它與您在OSX上遇到的問題完全不同。

+0

刪除了日誌記錄調用後,我仍然發現代碼掛在OSX上。你認爲其他模塊是否保持線程鎖定? (以及我怎樣才能找到答案?) - 另外;你知道我使用的隊列大小是否有限制嗎? –

+0

@scott_ouce搜索一下我在跟蹤'multiprocessing'的問題中發現了這個[issue](http://bugs.python.org/issue7200)。它可能與你的問題有關,因爲它涉及到MacOSX和'mp.Queue'。 無論如何,它似乎真的很奇怪,因爲我沒有在代碼中看到可能的weakref問題。 – Bakuriu