2014-11-21 20 views

回答

0

這裏是與zeroMQ PUB SUB插座龍捲風HTTP服務器的例子。

#!/usr/bin/env python 
import json 
import tornado 
import tornado.web 
import zmq 
from tornado import httpserver 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 

ioloop.install() 
tornado.ioloop = ioloop 
import sys 


def ping_remote(): 
    """callback to keep the connection with remote server alive while we wait 
    Network routers between raspberry pie and cloud server will close the socket 
    if there is no data exchanged for long time. 
    """ 
    pub_inst.send_json_data(msg="Ping", req_id="##") 
    sys.stdout.write('.') 
    sys.stdout.flush() 


pending_requests = {} 


class ZMQSub(object): 
    def __init__(self, callback): 
     self.callback = callback 
     context = zmq.Context() 
     socket = context.socket(zmq.SUB) 
     # socket.connect('tcp://127.0.0.1:5559') 
     socket.bind('tcp://*:8081') 
     self.stream = ZMQStream(socket) 
     self.stream.on_recv(self.callback) 
     socket.setsockopt(zmq.SUBSCRIBE, "") 

    def shutdown_zmq_sub(self): 
     self.stream.close() 


class ZMQPub(object): 
    def __init__(self): 
     context = zmq.Context() 
     socket = context.socket(zmq.PUB) 
     socket.bind('tcp://*:8082') 
     self.publish_stream = ZMQStream(socket) 

    def send_json_data(self, msg, req_id): 
     topic = str(req_id) 
     self.publish_stream.send_multipart([topic, msg]) 

    def shutdown_zmq_sub(self): 
     self.publish_stream.close() 


def SensorCb(msg): 
    # decode message from raspberry pie and the channel ID. 
    key, msg = (i for i in msg) 
    if not key == "##": 
     msg = json.loads(msg) 

     if key in pending_requests.keys(): 
      req_inst = pending_requests[key] 
      req_inst.write(msg) 
      req_inst.finish() 
      del pending_requests[key] 
     else: 
      print "no such request" 
      print pending_requests 
    else: 
     print "received ping" 


class Handler(tornado.web.RequestHandler): 
    def __init__(self, *args, **kwargs): 
     super(Handler, self).__init__(*args, **kwargs) 

     # get the unique req id 
     self.req_id = str(self.application.req_id) + "#" 
     self.application.req_id += 1 

     # set headers 
     self.set_header("Access-Control-Allow-Origin", "*") 
     self.set_header("Access-Control-Allow-Headers", "x-requested-with") 
     self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS, PUT') 

    @tornado.web.asynchronous 
    def get(self): 
     print self.request 
     if self.req_id not in pending_requests.keys(): 
      pending_requests[self.req_id] = self 
     else: 
      print "WTF" 
     pub_inst.send_json_data(msg=json.dumps({"op": "ServiceCall"}), req_id=self.req_id) 


if __name__ == "__main__": 
    pub_inst = ZMQPub() 
    sub_inst = ZMQSub(callback=SensorCb) 
    application = tornado.web.Application(
     [(r'/get_sensor_data', Handler), (r'/(.*)')]) 
    application.req_id = 0 
    server = httpserver.HTTPServer(application,) 
    port = 8080 
    server.listen(port) 
    print "Sensor server ready on port: ", port 
    ping = ioloop.PeriodicCallback(ping_remote, 3000) 
    ping.start() 
    tornado.ioloop.IOLoop.instance().start() 
相關問題