我在使用回調函數和處理程序的Python中出現了一個奇怪的現象。 我使用ZMQ來處理我的通信併爲套接字使用流。我有基類:回調函數在實例中看不到正確的值
import multiprocessing
import zmq
from concurrent.futures import ThreadPoolExecutor
from zmq.eventloop import ioloop, zmqstream
from zmq.utils import jsonapi as json
# Types of messages
TYPE_A = 'type_a'
TYPE_B = 'type_b'
class ZmqProcess(multiprocessing.Process):
def __init__(self):
super(ZmqProcess, self).__init__()
self.context = None
self.loop = None
self.handle_stream = None
def setup(self):
self.context = zmq.Context()
self.loop = ioloop.IOLoop.instance()
def send(self, msg_type, msg, host, port):
sock = zmq.Context().socket(zmq.PAIR)
sock.connect('tcp://%s:%s' % (host, port))
sock.send_json([msg_type, msg])
def stream(self, sock_type, addr):
sock = self.context.socket(sock_type)
if isinstance(addr, str):
addr = addr.split(':')
host, port = addr if len(addr) == 2 else (addr[0], None)
if port:
sock.bind('tcp://%s:%s' % (host, port))
else:
port = sock.bind_to_random_port('tcp://%s' % host)
stream = zmqstream.ZMQStream(sock, self.loop)
return stream, int(port)
class MessageHandler(object):
def __init__(self, json_load=-1):
self._json_load = json_load
self.pool = ThreadPoolExecutor(max_workers=10)
def __call__(self, msg):
i = self._json_load
msg_type, data = json.loads(msg[i])
msg[i] = data
if msg_type.startswith('_'):
raise AttributeError('%s starts with an "_"' % msg_type)
getattr(self, msg_type)(*msg)
而且我有一個類,從它繼承:
import zmq
import zmq_base
class ZmqServerMeta(zmq_base.ZmqProcess):
def __init__(self, bind_addr, handlers):
super(ZmqServerMeta, self).__init__()
self.bind_addr = bind_addr
self.handlers = handlers
def setup(self):
super(ZmqServerMeta, self).setup()
self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr)
self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop,
self.handlers))
def run(self):
self.setup()
self.loop.start()
def stop(self):
self.loop.stop()
class StreamHandler(zmq_base.MessageHandler):
def __init__(self, handle_stream, stop, handlers):
super(StreamHandler, self).__init__()
self._handle_stream = handle_stream
self._stop = stop
self._handlers = handlers
def type_a(self, data):
if zmq_base.TYPE_A in self._handlers:
if self._handlers[zmq_base.TYPE_A]:
for handle in self._handlers[zmq_base.TYPE_A]:
self.pool.submit(handle, data)
else:
pass
else:
pass
def type_b(self, data):
if zmq_base.TYPE_B in self._handlers:
if self._handlers[zmq_base.TYPE_B]:
for handle in self._handlers[zmq_base.TYPE_B]:
self.pool.submit(handle, data)
else:
pass
else:
pass
def endit(self):
self._stop()
此外,我有我想作爲存儲使用的類。這裏是麻煩就來了:
import threading
import zmq_server_meta as server
import zmq_base as base
class Storage:
def __init__(self):
self.list = []
self.list_lock = threading.RLock()
self.zmq_server = None
self.host = '127.0.0.1'
self.port = 5432
self.bind_addr = (self.host, self.port)
def setup(self):
handlers = {base.TYPE_A: [self. remove]}
self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr)
self.zmq_server.start()
def add(self, data):
with self.list_lock:
try:
self.list.append(data)
except:
print "Didn't work"
def remove(self, msg):
with self.list_lock:
try:
self.list.remove(msg)
except:
print "Didn't work"
的想法是,那類存儲一些全局它接受到的信息。 它是在一個文件中的所有開始測試:
import sys
import time
import storage
import zmq_base as base
import zmq_server_meta as server
def printMsg(msg):
print msg
store = storage.Storage()
store.setup()
handlers = {base.TYPE_B: [printMsg]}
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431))
client.start()
message = "Test"
store.add(message)
client.send(base.TYPE_A, message, '127.0.0.1', 5432)
我簡化它以減少混亂。它不是僅僅添加它,而是通常發送然後回覆。響應,客戶端發送,應該通過正確的回調remove()來處理,並且它應該從列表中刪除某些內容。發生的問題是,remove()函數會看到一個空列表,儘管列表中應該有一個元素。如果從測試文件中檢查,我可以在添加元素後看到該元素,如果從那裏調用remove(),則會看到一個非空列表並可以將其刪除。我的問題是,爲什麼回調看到一個空的列表,我如何確保它看到列表中的正確元素?
親切的問候 帕特里克