2012-01-24 84 views
3

我有一個應用程序,它實現了像Python中的責任鏈。有一個通過multiprocessing.Queue()將對象傳遞給其他進程然後對對象執行操作的進程。對傳遞的對象的最後修改時間進行跟蹤也很重要,因此只有在修改對象時才能執行操作。Python多處理.Queue修改對象

我遇到的問題是,對象中的_modified屬性在從隊列中提取後會隨機更改。但是_mtime屬性總是正確的。下面的示例將運行並(有意)隨機修改DummyObject,然後將其放置在每個處理程序進程的隊列中。然後每個處理程序將打印它們在對象中收到的_modified和_mtime值。我期望_modified值在command_func和處理函數中都是相同的,但通常情況並非如此。如果我從DummyObject中刪除Object_w_mtime繼承,那麼在發送和接收的對象中看不到任何差異。

我對python比較陌生。就我所知,應該發生的事情是每次將一個對象放在隊列中,將其醃製,然後通過管道發送到接收過程,從而取消對該對象的反應。那是對的嗎?有沒有什麼方法可以在對象被醃漬/取消時對象繼承會被搞砸?

我在Ubuntu 11.10上用Python 2.7.2和2.6.7以及Ubuntu 11.04上的python 2.7.1測試了它。有時你必須讓它運行一分鐘左右才能看到行爲,因爲它看起來是隨機的。

在這裏抓住吸管,在此先感謝。

import multiprocessing 
import time 
import traceback 
import os 
import random 

class Object_w_mtime(object): 
    ''' 
    Parent object that tracks the last time an attribute was modified 
    ''' 
    def __setattr__(self,a_name,a_value): 
     if ((a_name not in ('_mtime','_modified')) and 
      (a_value != getattr(self,a_name,None)) 
     ): 
      object.__setattr__(self, '_modified', True) 
      object.__setattr__(self, '_mtime', time.time()) 
     object.__setattr__(self, a_name, a_value) 
     return True 
    #END def 

    def reset(self): 
     self._modified = False 
#END class 

class DummyObject(Object_w_mtime): 
    def __init__(self): 
     self.value = 10 

def handler(in_queue = None, handler_id = None): 
    print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id) 
    while True: 
     try: 
      obj = in_queue.get(True,61) 
      print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime 
     except multiprocessing.queues.Empty: 
      break 
     except KeyboardInterrupt: 
      break 
     except Exception as e: 
      print traceback.format_exc() 
      break 
    return True 
#END def 

def command_func(next_links = None): 
    print 'PID:' + str(os.getpid()) + ':command_func:<RUN>' 
    obj = DummyObject() 
    while True: 
     try: 
      # randomly assign a different value to test with a modified and unmodified object 
      obj.value = random.randint(0,1) 
      print '**************** obj.value = {0} ***************'.format(obj.value) 
      print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime 
      for each in next_links: 
       each.put(obj,False) 
     except multiprocessing.queues.Empty: 
      break 
     except KeyboardInterrupt: 
      break 
     except Exception as e: 
      print e 
      print traceback.format_exc() 
      break 
     obj.reset() 
     time.sleep(3) 
    return True 
#END def 


if __name__ == '__main__': 
    handler_queues = list() 
    handler_processes = list() 
    # Create a queue and process object for each command handler 
    for handler_id in range(1,4): 
     queue = multiprocessing.Queue() 
     process = multiprocessing.Process(target=handler, args=(queue, handler_id)) 
     handler_queues.append(queue) 
     handler_processes.append(process) 

    try: 
     # spawn handler processes 
     for process in handler_processes: 
      process.start() 
     # Start sending commands to handlers 
     command_func(handler_queues) 

    # exit on keyboard interrupt 
    except KeyboardInterrupt: 
     for process in handler_processes: 
      process.join() 
    except Exception: 
     traceback.print_exc() 

回答

5

簡而言之,您在將其放入隊列後修改obj

查看http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup第285行,put()僅將對象放置在內部隊列中,如果尚未運行,則啓動後臺線程以處理來自該隊列的對象。因此,代碼中的each.put(obj,False)obj.reset()之間存在競爭。

您應該只能使用具有不可變(副本)對象的隊列。

+0

謝謝,這完美地解釋了這個問題。 –