2015-03-25 64 views
0

這是一個簡單的req-rep服務。通常zmq.REP和zmq.REQ就足夠了,但這不是我正在構建的應用程序。以下是執行客戶端服務器通信的腳本。服務器在一個線程中運行。溝通按預期工作。客戶端發送消息,服務器接收消息,然後向接收消息的客戶端發送消息。ZeroMQ路由器和經銷商是否必須在相同的(Python)過程中才能實現雙向通信?

import time 
from threading import Thread 
import zmq 

def worker_thread(): 
    cxt = zmq.Context.instance() 
    worker = cxt.socket(zmq.DEALER) 
    worker.setsockopt(zmq.IDENTITY, 'A') 
    worker.connect("tcp://127.0.0.1:5559") 

    for _ in range(10): 
     request = worker.recv() 
     print 'worker recieved' 
     worker.send_multipart(['A', "data_recieved"]) 

cxt = zmq.Context.instance() 
client = cxt.socket(zmq.ROUTER) 
client.bind('tcp://127.0.0.1:5559') 

Thread(target=worker_thread).start() 
time.sleep(2) 

for _ in range(10): 
    client.send_multipart(['A', 'data']) 
    request = client.recv() 
    print 'worker responded' 

當此代碼被分成兩個腳本,客戶端和服務器時,通信失敗。客戶端發送消息,但服務器從不接收消息。代碼如下:

客戶端:

import time 
import zmq 

cxt = zmq.Context.instance() 
client = cxt.socket(zmq.ROUTER) 
client.bind('tcp://127.0.0.1:5559') 

for _ in range(10): 
    client.send_multipart(['A', 'data']) 
    request = client.recv() 
    print 'worker responded' 

服務器:

import time 
import zmq 

cxt = zmq.Context.instance() 
worker = cxt.socket(zmq.DEALER) 
worker.setsockopt(zmq.IDENTITY, 'A') 
worker.connect("tcp://127.0.0.1:5559") 

for _ in range(10): 
    request = worker.recv() 
    print 'worker recieved' 
    worker.send_multipart(['A', "data_recieved"]) 

我能想到的唯一的原因,可能是導致失敗的溝通就是zmq.ROUTER和zmq.DEALER在單獨的腳本中運行,因此獨立的進程。

+0

不,它們可以是在單獨的進程,獨立的可執行文件,在不同的計算機上或獨立的網絡。其他一些問題 – Jason 2015-03-25 18:30:31

回答

0

我想你只是錯誤地顛倒了兩種套接字類型。

路由器套接字的行爲就像一個Rep套接字,而經銷商的行爲像一個Req。查看詳情:http://zeromq.org/tutorials:dealer-and-router

路由器和經銷商是特殊的意義上說,他們允許異步消息,其中要求/代表專門用於同步交換。我不知道你的第一個腳本爲什麼可以工作,但你應該嘗試反轉套接字。

這裏是C#中的一個例子,它模仿了你的python。這是兩個獨立的過程。

方法A:

const string Endpoint = "tcp://127.0.0.1:5559"; 

void Main() 
{ 
    using (var ctx = NetMQContext.Create()) 
    using (var worker = ctx.CreateRouterSocket()) 
    { 
     worker.Connect(Endpoint); 

     for (int i = 0; i < 10; i++) 
     { 
      NetMQMessage message = router.ReceiveMessage(); 
      Console.WriteLine("#{0}: worker received: {1}", i, string.Join(", ", message.Select(t => t.ConvertToString()))); 
      message.Clear(); 
      message.Append("A"); 
      message.Append("data_recieved"); 
      worker.SendMessage(message); 
     } 
    } 
} 

流程B:

const string Endpoint = "tcp://127.0.0.1:5559"; 

void Main() 
{ 
    using (var ctx = NetMQContext.Create()) 
    using (var client = ctx.CreateDealerSocket()) 
    { 
     client.Options.Identity = Encoding.ASCII.GetBytes("A"); 
     client.Bind(Endpoint); 

     var message = new NetMQMessage(); 
     for (int i = 0; i < 10; i++) 
     { 
      message.Clear(); 
      message.Append("A"); 
      message.Append("data"); 
      client.SendMessage(message); 
      var response = client.ReceiveMessage(); 
      Console.WriteLine("#{0}: worker responded", i); 
     } 
    } 
} 

輸出似乎是你期待什麼:

方法A:

#0: worker received: A, A, data 
#1: worker received: A, A, data 
#2: worker received: A, A, data 
#3: worker received: A, A, data 
#4: worker received: A, A, data 
#5: worker received: A, A, data 
#6: worker received: A, A, data 
#7: worker received: A, A, data 
#8: worker received: A, A, data 
#9: worker received: A, A, data 

過程B:

#0: worker responded 
#1: worker responded 
#2: worker responded 
#3: worker responded 
#4: worker responded 
#5: worker responded 
#6: worker responded 
#7: worker responded 
#8: worker responded 
#9: worker responded 

乾杯

相關問題