2017-04-25 34 views
1

我在使用回調函數和處理程序的Python中出現了一個奇怪的現象。 我使用ZMQ來處理我的通信併爲套接字使用流。我有基類:回調函數在實例中看不到正確的值

import multiprocessing  
import zmq 
from concurrent.futures import ThreadPoolExecutor 
from zmq.eventloop import ioloop, zmqstream 
from zmq.utils import jsonapi as json 

# Types of messages 
TYPE_A = 'type_a' 
TYPE_B = 'type_b' 


class ZmqProcess(multiprocessing.Process): 
    def __init__(self): 
     super(ZmqProcess, self).__init__() 
     self.context = None 
     self.loop = None 
     self.handle_stream = None 

    def setup(self): 
     self.context = zmq.Context() 
     self.loop = ioloop.IOLoop.instance() 

    def send(self, msg_type, msg, host, port): 
     sock = zmq.Context().socket(zmq.PAIR) 
     sock.connect('tcp://%s:%s' % (host, port)) 
     sock.send_json([msg_type, msg]) 

    def stream(self, sock_type, addr): 
     sock = self.context.socket(sock_type) 
      if isinstance(addr, str): 
      addr = addr.split(':') 
     host, port = addr if len(addr) == 2 else (addr[0], None) 
      if port: 
      sock.bind('tcp://%s:%s' % (host, port)) 
     else: 
      port = sock.bind_to_random_port('tcp://%s' % host) 
     stream = zmqstream.ZMQStream(sock, self.loop)  
     return stream, int(port) 

class MessageHandler(object): 
    def __init__(self, json_load=-1): 
     self._json_load = json_load 
     self.pool = ThreadPoolExecutor(max_workers=10) 

    def __call__(self, msg): 
     i = self._json_load 
     msg_type, data = json.loads(msg[i]) 
     msg[i] = data 
     if msg_type.startswith('_'): 
      raise AttributeError('%s starts with an "_"' % msg_type) 
     getattr(self, msg_type)(*msg) 

而且我有一個類,從它繼承:

import zmq  
import zmq_base  

class ZmqServerMeta(zmq_base.ZmqProcess): 
    def __init__(self, bind_addr, handlers): 
     super(ZmqServerMeta, self).__init__() 
     self.bind_addr = bind_addr 
     self.handlers = handlers 

    def setup(self): 
     super(ZmqServerMeta, self).setup() 
     self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr) 
     self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop, 
               self.handlers)) 

    def run(self): 
     self.setup() 
     self.loop.start() 

    def stop(self): 
     self.loop.stop() 

class StreamHandler(zmq_base.MessageHandler): 
    def __init__(self, handle_stream, stop, handlers): 
     super(StreamHandler, self).__init__() 
     self._handle_stream = handle_stream 
     self._stop = stop 
     self._handlers = handlers 

    def type_a(self, data): 
     if zmq_base.TYPE_A in self._handlers: 
      if self._handlers[zmq_base.TYPE_A]: 
       for handle in self._handlers[zmq_base.TYPE_A]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def type_b(self, data): 
     if zmq_base.TYPE_B in self._handlers: 
      if self._handlers[zmq_base.TYPE_B]: 
       for handle in self._handlers[zmq_base.TYPE_B]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def endit(self): 
     self._stop() 

此外,我有我想作爲存儲使用的類。這裏是麻煩就來了:

import threading 
import zmq_server_meta as server 
import zmq_base as base 


class Storage: 
    def __init__(self): 
     self.list = [] 

     self.list_lock = threading.RLock() 

     self.zmq_server = None 
     self.host = '127.0.0.1' 
     self.port = 5432 
     self.bind_addr = (self.host, self.port) 

    def setup(self): 
     handlers = {base.TYPE_A: [self. remove]} 
     self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr) 
     self.zmq_server.start() 

    def add(self, data): 
     with self.list_lock: 
      try: 
       self.list.append(data) 
      except: 
       print "Didn't work" 

    def remove(self, msg): 
     with self.list_lock: 
      try: 
       self.list.remove(msg) 
      except: 
       print "Didn't work" 

的想法是,那類存儲一些全局它接受到的信息。 它是在一個文件中的所有開始測試:

import sys 
import time 
import storage 
import zmq_base as base 
import zmq_server_meta as server 



def printMsg(msg): 
    print msg 

store = storage.Storage() 

store.setup() 
handlers = {base.TYPE_B: [printMsg]} 
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431)) 
client.start() 

message = "Test" 

store.add(message) 
client.send(base.TYPE_A, message, '127.0.0.1', 5432) 

我簡化它以減少混亂。它不是僅僅添加它,而是通常發送然後回覆。響應,客戶端發送,應該通過正確的回調remove()來處理,並且它應該從列表中刪除某些內容。發生的問題是,remove()函數會看到一個空列表,儘管列表中應該有一個元素。如果從測試文件中檢查,我可以在添加元素後看到該元素,如果從那裏調用remove(),則會看到一個非空列表並可以將其刪除。我的問題是,爲什麼回調看到一個空的列表,我如何確保它看到列表中的正確元素?

親切的問候 帕特里克

回答

1

我相信,該ZmqProcess類從multiprocessing.Process繼承的事實問題奠定。多重不允許不同的進程間共享的對象,除了通過使用值或陣列的共享存儲器映射(如可以在文檔中可以看出:https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes

如果你想使用自定義的對象,你可以使用服務器進程/代理對象,可以在文檔的同一頁面找到。

所以,你可以,例如,在存儲類等的初始化函數定義經理:self.manager = Manager()然後你把self.list = self.manager.list()。這應該做的伎倆。

相關問題