2016-09-22 15 views
0


我想讀取數據從6個不同的程序上的套接字一次爲perfomance的緣故。我做了一個測試,打開6個線程,並從每個測試套接字讀取一個測試,並打開6個子過程並讀取另一個測試套接字。線程讀取工作正常,它看起來像這樣:recv_pyobj表現不同線程對進程

class ZMQServer: 
    context = zmq.Context() 
    socket = None 
    ZMQthread = None 

    def __init__(self, port, max_size): 
     self.socket = self.context.socket(zmq.SUB) 
     self.socket.setsockopt(zmq.SUBSCRIBE, '') 
     self.socket.connect("tcp://127.0.0.1:" + str(port)) 

    def StartAsync(self): 
     ZMQthread = threading.Thread(target=self.Start) 
     ZMQthread.start() 

    def Start(self): 
     print "ZMQServer:Wait for next request from client on port: %d" % self.port 
     while True: 
      print "Running another loop" 
      try: 
       message = self.socket.recv_pyobj() 
      except: 
       print "ZMQServer:Error receiving messages" 

if __name__ == '__main__': 
    zmqServers = [None] * 6 
    for idx in range (0, 6): 
     zmqServers[idx] = ZMQServer(DTypes.PORTS_RECREGISTER[idx], 1024) 
     zmqServers[idx].StartAsync() 

這將顯示:

ZMQServer:等待來自客戶端的端口下一個請求:4994
運行另一個循環
ZMQServer:等待來自客戶端的下一個請求端口:4995
運行另一個循環
ZMQServer:等待來自客戶端的下一個端口請求:4996
運行anoth呃環
ZMQServer:等待來自客戶端的端口下一個請求:4997
運行另一個循環
ZMQServer:等待來自客戶端的端口下一個請求:4998
運行另一個循環
ZMQServer:等待來自客戶端下一個請求在端口上:4999

重要提示:我收到關於socket的數據我發送它。
現在,我需要行爲相同的行爲,但只有線程使用進程,所以八核會使用更多的處理器。代碼看起來是這樣的:

context = zmq.Context() 
def CreateSocket(port): 
    socket = context.socket(zmq.SUB) 
    socket.setsockopt(zmq.SUBSCRIBE, '') 
    socket.connect("tcp://127.0.0.1:" + str(port)) 

def Listen(socket, port): 
    print "ZMQServer:Wait for next request from client on port: %d" % port 
    while True: 
     print "Running another loop" 
     try: 
      message = socket.recv_pyobj() 
      print "ZMQServer:Received request: %s" % message 
     except: 
      print "ZMQServer:Error receiving messages" 
      continue 

    #I'm trying first only with 1 Process - 1 socket: 

    if __name__ == '__main__': 
     port = DTypes.PORTS_RECREGISTER[0] 
     socket = CreateSocket(port) 
     proc = Process(target=Listen, args=(socket, port)) 
     proc.start() 
     proc.join() 

輸出是奇怪:

ZMQServer:等待從客戶端上的端口下一個請求:4994
運行另一個循環
ZMQServer:錯誤接收消息
運行另一個循環
ZMQServer:接收消息時出錯
運行另一個循環
ZMQServer:錯誤接收消息
............

也是非常重要的是,我沒有收到套接字上的數據時,我把它
那麼,從什麼我可以理解:
1.方法進入每一個while循環socket換?
2.或者recv_pyobj不再被阻塞?
以前有人遇到過嗎?有人知道如何正確執行多進程套接字讀取嗎?
謝謝

回答

0

實際上,問題是因爲SOCKET類型參數傳遞給進程/線程。看起來酸洗,幕後的序列化傳遞給回調方法的數據不會很好地序列化SOCKET參數。
所以,我改變了它,所以我不會傳遞套接字參數:

context = zmq.Context() 

def ListeToSocket(port): 
    socket = context.socket(zmq.SUB) 
    socket.setsockopt(zmq.SUBSCRIBE, '') 
    socket.connect("tcp://127.0.0.1:" + str(port)) 
    print "Wait for data on port: %d" % port 
    while True: 
     try: 
      message = socket.recv_pyobj() 
      print "PORT:%d Received data: %s" % (port, message) 
     except: 
      print "PORT:%d: Error receiving messages" % port 

if __name__ == '__main__': 
    for idx in range(0, DTypes.SOCKET_MAX): 
     proc1 = Process(target=ListeToSocket, args=(DTypes.PORTS_RECREGISTER[idx],)) 
     proc1.start()