2012-03-19 32 views
2

您好我有一些問題包裝Python類中的一些ZMQ拉客戶端。這些類被實例化並通過多處理模塊在子進程中調用。當客戶端功能全部有效時,但是當它們是類時,poller.poll()會掛起。ZMQ輪詢不在類實例中工作

代碼波紋管有兩個版本:一個可以工作,另一個不可以。爲什麼?

import zmq 
import time 
import sys 
import random 
from multiprocessing import Process 

def server_push(port="5556"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUSH) 
    socket.bind("tcp://*:%s" % port) 
    print "Running server on port: ", port 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     if reqnum < 6: 
      socket.send("Continue") 
     else: 
      socket.send("Exit") 
      break 
     time.sleep (1) 

def server_pub(port="5558"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUB) 
    socket.bind("tcp://*:%s" % port) 
    publisher_id = random.randrange(0,9999) 
    print "Running server on port: ", port 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     # Wait for next request from client 
     topic = random.randrange(8,10) 
     messagedata = "server#%s" % publisher_id 
     print "%s %s" % (topic, messagedata) 
     socket.send("%d %s" % (topic, messagedata)) 
     time.sleep(1)  


class Client: 
    def __init__(self,port_push, port_sub): 
     context = zmq.Context() 
     self.socket_pull = context.socket(zmq.PULL) 
     self.socket_pull.connect ("tcp://localhost:%s" % port_push) 
     print "Connected to server with port %s" % port_push 
     self.socket_sub = context.socket(zmq.SUB) 
     self.socket_sub.connect ("tcp://localhost:%s" % port_sub) 
     self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
     print "Connected to publisher with port %s" % port_sub 
     # Initialize poll set 


    def __call__(self): 
     poller = zmq.Poller() 
     poller.register(self.socket_pull, zmq.POLLIN) 
     poller.register(self.socket_sub, zmq.POLLIN) 
     # Work on requests from both server and publisher 
     should_continue = True 
     print "listening" 
     while should_continue: 
      print "hello" 
      socks = dict(poller.poll()) 
      print poller 
      if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN: 
       message = self.socket_pull.recv() 
       print "Recieved control command: %s" % message 
       if message == "Exit": 
        print "Recieved exit command, client will stop recieving messages" 
        should_continue = False 

       if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN: 
        string = self.socket_sub.recv() 
        topic, messagedata = string.split() 
        print "Processing ... ", topic, messagedata 

def client(port_push, port_sub): 
    context = zmq.Context() 
    socket_pull = context.socket(zmq.PULL) 
    socket_pull.connect ("tcp://localhost:%s" % port_push) 
    print "Connected to server with port %s" % port_push 
    socket_sub = context.socket(zmq.SUB) 
    socket_sub.connect ("tcp://localhost:%s" % port_sub) 
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
    print "Connected to publisher with port %s" % port_sub 
    # Initialize poll set 
    poller = zmq.Poller() 
    poller.register(socket_pull, zmq.POLLIN) 
    poller.register(socket_sub, zmq.POLLIN) 
    # Work on requests from both server and publisher 
    should_continue = True 
    while should_continue: 
     socks = dict(poller.poll()) 
     if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: 
      message = socket_pull.recv() 
      print "Recieved control command: %s" % message 
      if message == "Exit": 
       print "Recieved exit command, client will stop recieving messages" 
       should_continue = False 

     if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: 
      string = socket_sub.recv() 
      topic, messagedata = string.split() 
      print "Processing ... ", topic, messagedata 

if __name__ == "__main__": 
    # Now we can run a few servers 
    server_push_port = "5556" 
    server_pub_port = "5558" 
    Process(target=server_push, args=(server_push_port,)).start() 
    Process(target=server_pub, args=(server_pub_port,)).start() 
    #~ Process(target=client,args=(server_push_port,server_pub_port)).start() 
    Process(target=Client(server_push_port,server_pub_port)).start() 
+0

在與子模塊有限的工作我已經注意到,它可以難以診斷異常情況 - 你確定你已經正確診斷並且你沒有在其他地方發現異常嗎?我過去做過的一件事是明確地將子進程分解爲我可以在自己的控制檯窗口中監視的進程。 – 2012-03-19 13:26:05

+0

這只是一個測試代碼,沒有什麼隱藏的。無論是假想中斷,發生在pyzmq內。我希望有一個解決方法,以便不必等待pyzmq中的修復。 我同意使用多進程時很難調試,但它提供的便利超過了這個問題。 – fccoelho 2012-03-19 13:41:09

回答

2

EDIT1:這是不太正確的,給我一些時間來得到它的權利...

我想你可能會調用客戶端類的錯誤的方式。我不是這方面的專家,但我認爲你的客戶端應該從Process進行分類,然後使用.start()函數運行。所以,定義你的客戶端類是這樣的:

class Client(Process): 
    def __init__(self, port_push, port_sub): 
     (...) # your class init code here...make sure indentation is correct 

然後在您運行服務器端,創建您的客戶端類的實例並啓動它,像這樣:

client_class = Client(port_push, port_sub) 
client_class.start() 

EDIT2:這裏是編輯的fccoelho代碼版本適用於我。

最大的問題似乎是ZMQ初始化的東西需要在__call__方法中完成,而不是在__init__中完成。我懷疑這是由於如何在多處理中分配內存,因爲__init__函數將在父進程中完成,而__call__函數在具有單獨內存空間的子進程中完成。顯然ZMQ不喜歡這個。我還添加了一些等待時間來防止客戶端在服務器準備就緒之前連接到服務器,並防止服務器在客戶端訂閱之前發送消息。還使用127.0.0.1而不是本地主機(由於某種原因,我的電腦不喜歡localhost)。還刪除了客戶端輪詢調用周圍煩人的打印消息,並修復了客戶端在pubsub套接字上檢查輪詢結果的縮進問題。

import zmq 
import time 
import sys 
import random 
from multiprocessing import Process 

def server_push(port="5556"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUSH) 
    socket.bind("tcp://127.0.0.1:%s" % port) 
    print "Running server on port: ", port 
    time.sleep(1.0) 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     if reqnum < 6: 
      socket.send("Continue") 
     else: 
      socket.send("Exit") 
      print 'Push server sent "Exit" signal' 
      break 
     time.sleep(0.4) 

def server_pub(port="5558"): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUB) 
    socket.bind("tcp://127.0.0.1:%s" % port) 
    socket.setsockopt(zmq.HWM, 1000) 
    publisher_id = random.randrange(0,9999) 
    print "Running server on port: ", port 
    time.sleep(1.0) 
    # serves only 5 request and dies 
    for reqnum in range(10): 
     # Wait for next request from client 
     topic = random.randrange(8,10) 
     messagedata = "server#%s" % publisher_id 
     print "%s %s" % (topic, messagedata) 
     socket.send("%d %s" % (topic, messagedata)) 
     time.sleep(0.4)  


class Client: 
    def __init__(self,port_push, port_sub): 
     self.port_push = port_push 
     self.port_sub = port_sub 
     # Initialize poll set 

    def __call__(self): 
     time.sleep(0.5) 
     print 'hello from class client!' 
     context = zmq.Context() 
     self.socket_pull = context.socket(zmq.PULL) 
     self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push) 
     print "Connected to server with port %s" % self.port_push 
     self.socket_sub = context.socket(zmq.SUB) 
     self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub) 
     self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
     print "Connected to publisher with port %s" % self.port_sub 

     poller = zmq.Poller() 
     poller.register(self.socket_pull, zmq.POLLIN) 
     poller.register(self.socket_sub, zmq.POLLIN) 
     # Work on requests from both server and publisher 
     should_continue = True 
     print "listening" 
     while should_continue: 
      # print "hello" 
      socks = dict(poller.poll()) 
      # print poller 
      if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN: 
       message = self.socket_pull.recv() 
       print "Recieved control command: %s" % message 
       if message == "Exit": 
        print "Recieved exit command, client will stop recieving messages" 
        should_continue = False 

      if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN: 
       string = self.socket_sub.recv() 
       topic, messagedata = string.split() 
       print "Processing ... ", topic, messagedata 

def client(port_push, port_sub): 
    print 'hello from function client!' 
    context = zmq.Context() 
    socket_pull = context.socket(zmq.PULL) 
    socket_pull.connect ("tcp://127.0.0.1:%s" % port_push) 
    print "Connected to server with port %s" % port_push 
    socket_sub = context.socket(zmq.SUB) 
    socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub) 
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9") 
    print "Connected to publisher with port %s" % port_sub 
    # Initialize poll set 
    poller = zmq.Poller() 
    poller.register(socket_pull, zmq.POLLIN) 
    poller.register(socket_sub, zmq.POLLIN) 
    # Work on requests from both server and publisher 
    should_continue = True 
    while should_continue: 
     socks = dict(poller.poll(1000)) 
     if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: 
      message = socket_pull.recv() 
      print "Recieved control command: %s" % message 
      if message == "Exit": 
       print "Recieved exit command, client will stop recieving messages" 
       should_continue = False 

     if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: 
      string = socket_sub.recv() 
      topic, messagedata = string.split() 
      print "Processing ... ", topic, messagedata 

if __name__ == "__main__": 
    # Now we can run a few servers 
    server_push_port = "5556" 
    server_pub_port = "5558" 
    Process(target=server_push, args=(server_push_port,)).start() 
    Process(target=server_pub, args=(server_pub_port,)).start() 
    # Process(target=client,args=(server_push_port,server_pub_port)).start() 
    Process(target=Client(server_push_port,server_pub_port)).start() 

最後,這裏有一個更清潔的實現多進程發佈訂閱的這是非常裸機,但更明確地說明了事情:

import zmq 
from multiprocessing import Process 
import time 

class ServerPubSub(Process): 
    def __init__(self, port, n): 
     Process.__init__(self) 
     self.port = port 
     self.n = n 

    def run(self): 
     self.context = zmq.Context() 
     self.pub = self.context.socket(zmq.PUB) 
     self.pub.bind('tcp://127.0.0.1:%d' % self.port) 
     self.pub.setsockopt(zmq.HWM, 1000) 

     time.sleep(1) 

     end = False 
     for i in range(self.n): 
      print 'SRV: sending message %d' % i 
      self.pub.send('Message %d' % i) 
      print 'SRV: message %d sent' % i 
      time.sleep(0.2) 

     self.pub.close() 

class ClientPubSub(Process): 
    def __init__(self, port, n): 
     Process.__init__(self) 
     self.port = port 
     self.n = n 

    def run(self): 
     self.context = zmq.Context() 
     self.sub = self.context.socket(zmq.SUB) 
     self.sub.connect('tcp://127.0.0.1:%d' % self.port) 
     self.sub.setsockopt(zmq.SUBSCRIBE, '') 
     self.poller = zmq.Poller() 
     self.poller.register(self.sub, zmq.POLLIN) 

     end = False 
     count = 0 
     while count < self.n: 
      ready = dict(self.poller.poll(0)) 
      if self.sub in ready and ready[self.sub] == zmq.POLLIN: 
       msg = self.sub.recv() 
       print 'CLI: received message "%s"' % msg 
       count += 1 

     self.sub.close() 

if __name__ == "__main__": 
    port = 5000 
    n = 10 
    server = ServerPubSub(port, n) 
    client = ClientPubSub(port, n) 

    server.start() 
    client.start() 

    server.join() 
    client.join() 
+0

這並沒有改變任何東西,我正在做的就是你的子類。當一個進程啓動時,目標可調用被調用。如果你運行上面的代碼,你會看到__call__方法被調用,因爲打印「聆聽」。這是對poller.poll()的調用,掛起... – fccoelho 2012-03-19 13:37:01

+0

當我第一次看到你的代碼時,你的縮進被搞砸了,它看起來像你正在定義客戶類以外的__call__。 – 2012-03-19 13:41:39

+0

不,縮進問題是當我將代碼粘貼到stackoverflow。我已經在這裏修復了它,但是我的代碼從一開始就正確縮進了 – fccoelho 2012-03-19 14:57:50