2016-07-13 21 views
1

我遇到了ZeroMQ的一個奇怪的行爲,我一直試圖調試整天。ZMQ:REQ/REP失敗,有多個併發請求和輪詢

這是一個重現問題的最小示例腳本。它可以使用Python3運行。

啓動一個帶有REP套接字的服務器,並且五個帶有REP套接字的客戶端基本上同時連接到它。結果是服務器在前幾條消息之後由於某種原因開始阻塞。看起來好像poller.poll(1000)是無限期阻止的。

此行爲似乎也是與時間有關的。在啓動客戶端的循環中插入sleep(0.1),並按預期工作。

我本來希望REP套接字將所有傳入消息排隊並通過sock.recv_multipart()一個接一個地釋放它們。

這裏發生了什麼?

import logging 
from threading import Thread 
from time import sleep 
import zmq 

logging.basicConfig(level=logging.INFO) 
PORT = "3446" 
stop_flag = False 

def server(): 

    logging.info("started server") 
    context = zmq.Context() 
    sock = context.socket(zmq.REP) 
    sock.bind("tcp://*:" + PORT) 
    logging.info("bound server") 

    poller = zmq.Poller() 
    poller.register(sock, zmq.POLLIN) 

    while not stop_flag: 

     socks = dict(poller.poll(1000)) 
     if socks.get(sock) == zmq.POLLIN: 

      request = sock.recv_multipart() 
      logging.info("received %s", request) 
      # sleep(0.5) 

      sock.send_multipart(["reply".encode()] + request) 

    sock.close() 

def client(name:str): 
    context = zmq.Context() 
    sock = context.socket(zmq.REQ) 
    sock.connect("tcp://localhost:" + PORT) 
    sock.send_multipart([name.encode()]) 
    logging.info(sock.recv_multipart()) 
    sock.close() 

logging.info("starting server") 
server_thread = Thread(target=server) 
server_thread.start() 
sleep(1) 

nr_of_clients = 5 
for i in range(nr_of_clients): 
    Thread(target=client, args=[str(i)]).start() 

stop_flag = True 

回答

0

對我來說,問題似乎是所有客戶端已經收到他們的答覆之前,你是「關停」的服務器。所以我想它不是服務器阻止,但客戶端。

您設置stop_flag之前,您可以通過解決這個等待一段時間:

sleep(5) 
stop_flag = True 

,或者更好,你明確地加入客戶端線程,如:

nr_of_clients = 5 
threads = [] 
for i in range(nr_of_clients): 
    thread = Thread(target=client, args=[str(i)]) 
    thread.start() 
    threads.append(thread) 

for thread in threads: 
    thread.join() 

stop_flag = True