2011-12-11 77 views
8

我使用Redis的與我同ASYC客戶Brukva龍捲風應用程序一起,當我看着在Brukva網站上的示例應用程序,他們正在製造的WebSocket的「初始化」的方法新的連接在Tornado中處理Redis連接的正確方法是什麼? (異步 - 發佈/訂閱)

class MessagesCatcher(tornado.websocket.WebSocketHandler): 
    def __init__(self, *args, **kwargs): 
     super(MessagesCatcher, self).__init__(*args, **kwargs) 
     self.client = brukva.Client() 
     self.client.connect() 
     self.client.subscribe('test_channel') 

    def open(self): 
     self.client.listen(self.on_message) 

    def on_message(self, result): 
     self.write_message(str(result.body)) 

    def close(self): 
     self.client.unsubscribe('test_channel') 
     self.client.disconnect() 

它在websocket的情況下很好,但如何處理它在普通的Tornado RequestHandler post方法中說長查詢操作(發佈 - 訂閱模型)。我正在更新處理程序的每個後期方法中的新客戶端連接是這種正確的方法?當我在Redis控制檯進行檢查時,發現客戶在每次新的操作後都會增加。

enter image description here

這裏是我的代碼示例。

c = brukva.Client(host = '127.0.0.1') 
c.connect() 

class MessageNewHandler(BaseHandler): 
    @tornado.web.authenticated 
    def post(self): 

     self.listing_id = self.get_argument("listing_id") 
     message = { 
      "id": str(uuid.uuid4()), 
      "from": str(self.get_secure_cookie("username")), 
      "body": str(self.get_argument("body")), 
     } 
     message["html"] = self.render_string("message.html", message=message) 

     if self.get_argument("next", None): 
      self.redirect(self.get_argument("next")) 
     else: 
      c.publish(self.listing_id, message) 
      logging.info("Writing message : " + json.dumps(message)) 
      self.write(json.dumps(message)) 

    class MessageUpdatesHandler(BaseHandler): 
     @tornado.web.authenticated 
     @tornado.web.asynchronous 
     def post(self): 
      self.listing_id = self.get_argument("listing_id", None) 
      self.client = brukva.Client() 
      self.client.connect() 
      self.client.subscribe(self.listing_id) 
      self.client.listen(self.on_new_messages) 

     def on_new_messages(self, messages): 
      # Closed client connection 
      if self.request.connection.stream.closed(): 
       return 
      logging.info("Getting update : " + json.dumps(messages.body)) 
      self.finish(json.dumps(messages.body)) 
      self.client.unsubscribe(self.listing_id) 


     def on_connection_close(self): 
      # unsubscribe user from channel 
      self.client.unsubscribe(self.listing_id) 
      self.client.disconnect() 

我很感激你是否提供了一些類似案例的示例代碼。

+0

Python中使用Redis,ZMQ和Tornado的異步PubSub - https://github.com/abhinavsingh/async_pubsub –

回答

2

您應該在您的應用中彙集連接。因爲它看起來像布魯克瓦不會自動支持(redis-py支持這個,但由於本質阻塞,所以它不適合龍捲風),你需要編寫自己的連接池。

雖然這個模式很簡單。 (這不是真正的操作代碼):

class BrukvaPool(): 

    __conns = {} 


    def get(host, port,db): 
     ''' Get a client for host, port, db ''' 

     key = "%s:%s:%s" % (host, port, db) 

     conns = self.__conns.get(key, []) 
     if conns: 
      ret = conns.pop() 
      return ret 
     else: 
      ## Init brukva client here and connect it 

    def release(client): 
     ''' release a client at the end of a request ''' 
     key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db) 
     self.__conns.setdefault(key, []).append(client) 

它可能有點棘手,但這是主要想法。

9

有點遲,但我一直在使用tornado-redis。它的工作原理與龍捲風的ioloop和tornado.gen模塊

安裝tornadoredis

可以從點子

pip install tornadoredis 

或setuptools的

easy_install tornadoredis 

安裝好,但你真的不應該去做。您也可以克隆存儲庫並提取它。然後運行

python setup.py build 
python setup.py install 

連接到Redis的

下面的代碼放在您的main.py或同等

redis_conn = tornadoredis.Client('hostname', 'port') 
redis_conn.connect() 

redis.connect只調用一次。這是一個阻塞呼叫,所以它應該在啓動主Ioloop之前調用。所有處理程序之間共享相同的連接對象。

你可以把它添加到您的應用程序設置,如

settings = { 
    redis = redis_conn 
} 
app = tornado.web.Application([('/.*', Handler),], 
           **settings) 

使用tornadoredis

的連接可以處理在self.settings['redis']使用,或者它可以被添加爲BaseHandler和子類的屬性該類用於其他請求處理程序。

class BaseHandler(tornado.web.RequestHandler): 

    @property 
    def redis(): 
     return self.settings['redis'] 

要使用redis的通信,tornado.web.asynchronoustornado.gen.engine裝飾使用

class SomeHandler(BaseHandler): 

    @tornado.web.asynchronous 
    @tornado.gen.engine 
    def get(self): 
     foo = yield gen.Task(self.redis.get, 'foo') 
     self.render('sometemplate.html', {'foo': foo} 

的額外信息

更多的實施例和其他的功能,如連接池和管道可以在找到github回購。

相關問題