0
我試圖如下面的圖描述了應用該推/拉模式:ZMQ設備,ioloop和多處理
| PULL ---> Send via HTTP
| PULL ---> Send via HTTP
---- PUSH ----- DEVICE ---- | PULL ---> Send via HTTP
| PULL ---> Send via HTTP
| PULL ---> Send via HTTP
的PUSH
插座連接到ZeroMQ裝置併發射其然後傳播到消息全部連接PULL
插座。我想實現的是一種在管道中的多個節點上的並行處理。 當處理由PULL
套接字完成時,它應該通過HTTP將消息轉發到遠程端點。
下面是代碼:
from multiprocessing import Process
import random
import time
import zmq
from zmq.devices import ProcessDevice
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
bind_in_port = 5559
bind_out_port = 5560
dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port)
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port)
dev.setsockopt_in(zmq.IDENTITY, b'PULL')
dev.setsockopt_out(zmq.IDENTITY, b'PUSH')
dev.start()
time.sleep(2)
def push():
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%s" % bind_in_port)
server_id = random.randrange(1,10005)
for i in range(5):
print("Message %d sent" % i)
socket.send_string("Push from %s" % server_id)
def pull():
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%s" % bind_out_port)
loop = ioloop.IOLoop.instance()
pull_stream = ZMQStream(socket, loop)
def on_recv(message):
print(message)
pull_stream.on_recv(on_recv)
loop.start()
Process(target=push).start()
time.sleep(2)
for i in range(2):
Process(target=pull).start()
雖然消息被正確發送到ZeroMQ設備,我看不出收到任何消息 - on_recv
回調是永遠不會被調用。
任何幫助表示讚賞。
感謝
謝謝Dany。添加睡眠呼叫已解決了該問題。 – Nedo
替代(更好?)解決方案是實現握手。例如發送\ x00直到它收到,然後另一個發送迴應像\ x01這樣的預定答案。那麼你可以放心地認爲它已經連接。保持它捕捉斷開連接。但這超出了原始問題的範圍:) –