這對ZeroMQ應該不會太難。這是我會做的。
在雲中編寫龍捲風HTTP服務器,並結合兩個端口用於zeroMQ PUB和SUB套接字。服務器爲用戶提供REST端點。在樹莓派上,將兩個zeroMQ PUB SUB插座連接到雲服務器。
當您在雲服務器上收到請求時,請在PUB套接字上發送命令。通過zeroMQ SUB套接字在接收到的數據上設置回調。當您收到數據時,在樹莓派上運行您的傳感器代碼。如果有回覆,則通過覆盆子派的PUB頻道發送。在服務器上,回調將回復給用戶。
下面是服務器(雲)和客戶端(樹莓派)的代碼片段來做到這一點。
在雲服務器上運行。
#!/usr/bin/env python
import json
import tornado
import tornado.web
import zmq
from tornado import httpserver
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
tornado.ioloop = ioloop
import sys
def ping_remote():
"""callback to keep the connection with remote server alive while we wait
Network routers between raspberry pie and cloud server will close the socket
if there is no data exchanged for long time.
"""
pub_inst.send_json_data(msg="Ping", req_id="##")
sys.stdout.write('.')
sys.stdout.flush()
pending_requests = {}
class ZMQSub(object):
def __init__(self, callback):
self.callback = callback
context = zmq.Context()
socket = context.socket(zmq.SUB)
# socket.connect('tcp://127.0.0.1:5559')
socket.bind('tcp://*:8081')
self.stream = ZMQStream(socket)
self.stream.on_recv(self.callback)
socket.setsockopt(zmq.SUBSCRIBE, "")
def shutdown_zmq_sub(self):
self.stream.close()
class ZMQPub(object):
def __init__(self):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:8082')
self.publish_stream = ZMQStream(socket)
def send_json_data(self, msg, req_id):
topic = str(req_id)
self.publish_stream.send_multipart([topic, msg])
def shutdown_zmq_sub(self):
self.publish_stream.close()
def SensorCb(msg):
# decode message from raspberry pie and the channel ID.
key, msg = (i for i in msg)
if not key == "##":
msg = json.loads(msg)
if key in pending_requests.keys():
req_inst = pending_requests[key]
req_inst.write(msg)
req_inst.finish()
del pending_requests[key]
else:
print "no such request"
print pending_requests
else:
print "received ping"
class Handler(tornado.web.RequestHandler):
def __init__(self, *args, **kwargs):
super(Handler, self).__init__(*args, **kwargs)
# get the unique req id
self.req_id = str(self.application.req_id) + "#"
self.application.req_id += 1
# set headers
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS, PUT')
@tornado.web.asynchronous
def get(self):
print self.request
if self.req_id not in pending_requests.keys():
pending_requests[self.req_id] = self
else:
print "WTF"
pub_inst.send_json_data(msg=json.dumps({"op": "ServiceCall"}), req_id=self.req_id)
if __name__ == "__main__":
pub_inst = ZMQPub()
sub_inst = ZMQSub(callback=SensorCb)
application = tornado.web.Application(
[(r'/get_sensor_data', Handler), (r'/(.*)')])
application.req_id = 0
server = httpserver.HTTPServer(application,)
port = 8080
server.listen(port)
print "Sensor server ready on port: ", port
ping = ioloop.PeriodicCallback(ping_remote, 3000)
ping.start()
tornado.ioloop.IOLoop.instance().start()
在Raspberry Pie上,運行以下客戶端。
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
import sys
ioloop.install()
def ping_remote():
"""keep pinging the server so that the connection between server and client is not lost"""
pub_inst.send_json_data(msg="Ping", req_id="##")
sys.stdout.write('.')
sys.stdout.flush()
pending_requests = {}
class ZMQSub(object):
def __init__(self, callback):
self.callback = callback
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:8082') # replace by cloud server's IP address
self.stream = ZMQStream(socket)
self.stream.on_recv(self.callback)
socket.setsockopt(zmq.SUBSCRIBE, "")
def shutdown_zmq_sub(self):
self.stream.close()
class ZMQPub(object):
def __init__(self):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:8081') # replace by cloud server's IP address
self.publish_stream = ZMQStream(socket)
def send_json_data(self, msg, req_id):
topic = str(req_id)
self.publish_stream.send_multipart([topic, msg])
def shutdown_zmq_sub(self):
self.publish_stream.close()
def SensorCb(msg):
# decode image and the channel ID.
key, msg = (i for i in msg)
if not key == "##":
print key, msg
#####
# Do your sensor specific work here. And built a reply for remote server.
#####
resp = "response from raspi"
pub_inst.send_json_data(msg=resp, req_id=key)
else:
print "received ping"
if __name__ == "__main__":
pub_inst = ZMQPub()
sub_inst = ZMQSub(callback=SensorCb)
ioloop.PeriodicCallback(ping_remote, 3000).start()
ioloop.IOLoop.instance().start()
意見:龍捲風可以很容易地做你所需要的。數據可以通過URL或POST數據傳入,並且返回可以是任意數據(例如,通常是json字典)。 – mdurant
是的,我同意你的意見。但通信需要非常雙向,以便服務器也可以提供傳感器命令。看起來Tornado並不是爲真正的雙向管道而設計的。 –
我想說的是,在這種情況下,你所需要的雙向性是定期輪詢get命令端點。 – mdurant