2014-02-10 43 views
0

我正在設計一個Python多處理代碼,以便在可能隨處理更新的隊列中工作。下面的代碼有時會起作用,或者卡住,或者出現Empty錯誤。使用管理器更新Python多進程中的隊列

import multiprocessing as mp 

def worker(working_queue, output_queue): 
    while True: 
     if working_queue.empty() is True: 
      break  
     else: 
      picked = working_queue.get_nowait() 
      if picked % 2 == 0: 
        output_queue.put(picked) 
      else: 
       working_queue.put(picked+1) 
    return 

if __name__ == '__main__': 
    manager = mp.Manager() 
    static_input = xrange(100)  
    working_q = manager.Queue() 
    output_q = mp.Queue() 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() is True: 
      break 
     results_bank.append(output_q.get_nowait()) 
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. 
    results_bank.sort() 
    print results_bank 

我應該使用一個列表作爲全局變量,並鎖定它,而不是一個manager.Queue()?

+0

此代碼工作就好了我的機器上。 – plover

+0

感謝您的評論。你能多試幾次嗎?對我來說,它在十分之三左右的時候效果很好。 – Jaqo

+2

問題出在'如果working_queue.empty()爲真:'一個進程可以在另一個進程彈出最後一個值之前檢查是否爲空隊列,然後調用'working_queue.get_nowait()'會引發異常 –

回答

0

我剛剛加了一個try:except Exception:來處理Empty錯誤。現在結果似乎一致。請讓我知道如果您發現我在此解決方案中忽略的問題。

import multiprocessing as mp 

def worker(working_queue, output_queue): 
    while True: 
     try: 
      if working_queue.empty() is True: 
       break 
      else: 
       picked = working_queue.get_nowait() 
       if picked % 2 == 0: 
         output_queue.put(picked) 
       else: 
        working_queue.put(picked+1) 
     except Exception: 
      continue 

    return 

if __name__ == '__main__': 
    #Manager seem to be unnecessary. 
    #manager = mp.Manager() 
    #working_q = manager.Queue() 

    working_q = mp.Queue() 
    output_q = mp.Queue() 
    static_input = xrange(100)  
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() is True: 
      break 
     results_bank.append(output_q.get_nowait()) 
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. 
    results_bank.sort() 
    print results_bank 
0

只需用一個鎖來保護訪問共享數據,它是安全的(且會保護你從過程的怪異行爲):

import multiprocessing as mp 

def worker(working_queue, output_queue, lock): 
    while True: 
     shouldBeak = False 
     lock.acquire() 
     if working_queue.empty() is True: 
      shouldBeak = True  
     else: 

      picked = working_queue.get_nowait() 
      if picked % 2 == 0: 
       output_queue.put(picked) 
      else: 
       working_queue.put(picked+1) 
     lock.release() 
     if shouldBeak: 
      break 
    return 

if __name__ == '__main__': 
    manager = mp.Manager() 
    static_input = xrange(1000)  
    working_q = manager.Queue() 
    output_q = mp.Queue() 
    lock = mp.Lock() 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q,lock)) for i in range(mp.cpu_count())] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() is True: 
      break 
     results_bank.append(output_q.get_nowait()) 
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. 
    results_bank.sort() 
    print results_bank 
+0

感謝您的快速回答。我剛剛試用了您的提案,輸入範圍爲1000和10000。該方案與前者相得益彰,但後者卻陷入困境。 – Jaqo