2015-06-05 47 views
0

我試圖如下面的圖描述了應用該推/拉模式:ZMQ設備,ioloop和多處理

      | PULL ---> Send via HTTP 
          | PULL ---> Send via HTTP 
---- PUSH ----- DEVICE ---- | PULL ---> Send via HTTP 
          | PULL ---> Send via HTTP 
          | PULL ---> Send via HTTP 

PUSH插座連接到ZeroMQ裝置併發射其然後傳播到消息全部連接PULL插座。我想實現的是一種在管道中的多個節點上的並行處理。 當處理由PULL套接字完成時,它應該通過HTTP將消息轉發到遠程端點。

下面是代碼:

from multiprocessing import Process 
import random 
import time 
import zmq 
from zmq.devices import ProcessDevice 

from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 

ioloop.install() 


bind_in_port = 5559 
bind_out_port = 5560 

dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) 
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port) 
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port) 
dev.setsockopt_in(zmq.IDENTITY, b'PULL') 
dev.setsockopt_out(zmq.IDENTITY, b'PUSH') 
dev.start() 
time.sleep(2) 


def push(): 
    context = zmq.Context() 
    socket = context.socket(zmq.PUSH) 
    socket.connect("tcp://127.0.0.1:%s" % bind_in_port) 
    server_id = random.randrange(1,10005) 
    for i in range(5): 
     print("Message %d sent" % i) 
     socket.send_string("Push from %s" % server_id) 


def pull(): 
    context = zmq.Context() 
    socket = context.socket(zmq.PULL) 
    socket.connect("tcp://127.0.0.1:%s" % bind_out_port) 
    loop = ioloop.IOLoop.instance() 

    pull_stream = ZMQStream(socket, loop) 

    def on_recv(message): 
     print(message) 
    pull_stream.on_recv(on_recv) 

    loop.start() 

Process(target=push).start() 

time.sleep(2) 

for i in range(2): 
    Process(target=pull).start() 

雖然消息被正確發送到ZeroMQ設備,我看不出收到任何消息 - on_recv回調是永遠不會被調用。

任何幫助表示讚賞。

感謝

回答

0

有一個在設備初始化上述失蹤代碼來提供一個完整的答案。 dev和* port是什麼? 1個東西可能是添加睡眠(1)後連接端口穩定 注意:無需設置身份推/拉

+0

謝謝Dany。添加睡眠呼叫已解決了該問題。 – Nedo

+0

替代(更好?)解決方案是實現握手。例如發送\ x00直到它收到,然後另一個發送迴應像\ x01這樣的預定答案。那麼你可以放心地認爲它已經連接。保持它捕捉斷開連接。但這超出了原始問題的範圍:) –