2012-10-12 89 views
4

我在我的Django應用程序中實現了一個Server Sent Event API,以便將來自後端的實時更新流傳輸到瀏覽器。後端是Redis pubsub。我的Django的看法是這樣的:Django:清理客戶端與流斷開連接後的redis連接

def event_stream(request): 
    """ 
    Stream worker events out to browser. 
    """ 

    listener = events.Listener(
     settings.EVENTS_PUBSUB_URL, 
     channels=[settings.EVENTS_PUBSUB_CHANNEL], 
     buffer_key=settings.EVENTS_BUFFER_KEY, 
     last_event_id=request.META.get('HTTP_LAST_EVENT_ID') 
    ) 

    return http.HttpResponse(listener, mimetype='text/event-stream') 

而且我要回作爲一個迭代的events.Listener類看起來是這樣的:

class Listener(object): 
    def __init__(self, rcon_or_url, channels, buffer_key=None, 
       last_event_id=None): 
     if isinstance(rcon_or_url, redis.StrictRedis): 
      self.rcon = rcon_or_url 
     elif isinstance(rcon_or_url, basestring): 
      self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url)) 
     self.channels = channels 
     self.buffer_key = buffer_key 
     self.last_event_id = last_event_id 
     self.pubsub = self.rcon.pubsub() 
     self.pubsub.subscribe(channels) 

    def __iter__(self): 
     # If we've been initted with a buffer key, then get all the events off 
     # that and spew them out before blocking on the pubsub. 
     if self.buffer_key: 
      buffered_events = self.rcon.lrange(self.buffer_key, 0, -1) 

      # check whether msg with last_event_id is still in buffer. If so, 
      # trim buffered_events to have only newer messages. 
      if self.last_event_id: 
       # Note that we're looping through most recent messages first, 
       # here 
       counter = 0 
       for msg in buffered_events: 
        if (json.loads(msg)['id'] == self.last_event_id): 
         break 
        counter += 1 
       buffered_events = buffered_events[:counter] 

      for msg in reversed(list(buffered_events)): 
       # Stream out oldest messages first 
       yield to_sse({'data': msg}) 
     try: 
      for msg in self.pubsub.listen(): 
       if msg['type'] == 'message': 
        yield to_sse(msg) 
     finally: 
      logging.info('Closing pubsub') 
      self.pubsub.close() 
      self.rcon.connection_pool.disconnect() 

我能夠順利流事件出來的瀏覽器與此設置。但是,似乎聽者的「終於」中的斷開連接調用並沒有真正被調用。我假設他們仍在等待消息來自pubsub。隨着客戶端斷開連接並重新連接,我可以看到我的Redis實例的連接數量從未下降。一旦達到1000左右,Redis就會開始瘋狂並消耗所有可用的CPU。

我希望能夠檢測客戶端何時不再偵聽並關閉Redis連接。

事情我已經嘗試過或者想過:

  1. 連接池。但是,redis-py自述文件指出:「在線程之間傳遞PubSub或Pipeline對象是不安全的。」
  2. 處理連接或中斷的中間件。這不起作用,因爲中間件的process_response()方法被調用得太早(甚至在http頭被髮送到客戶端之前)。我需要在客戶端斷開連接時調用某些內容,而我正在向他們傳輸內容。
  3. request_finishedgot_request_exception信號。第一個,像中間件中的process_response(),似乎太快了。當客戶端中斷連接時,第二個不會被調用。

最終的皺紋:在生產中我使用了Gevent,所以我可以避免一次打開很多連接。但是,無論我使用普通的'manage.py runserver'還是Gevent monkeypatched runserver或Gunicorn的gevent工作人員,都會發生此連接泄漏問題。

回答

0

UPDATE:As of Django 1.5,如果你想懶散地把事情流出來,你需要返回一個StreamingHttpResponse實例,就像我在這個問題/答案中做的那樣。

ORIGINAL回答以下

很多敲打東西,閱讀框架代碼後,我發現我的想法是正確的回答這個問題。

  1. 按照WSGI PEP,如果您的應用程序返回一個迭代用close()方法,它應該由WSGI服務器一旦響應完成調用。 Django也支持這一點。這是做我需要的Redis連接清理的一個很自然的地方。
  2. Python的wsgiref實現中有a bug,在Django的'runserver'中被擴展,導致close()在客戶端與服務器中途斷開連接時跳過。我已經提交了一個補丁。
  3. 即使服務器承認close(),它將不會被調用,直到寫入客戶端實際失敗。如果你的迭代器被阻塞在pubsub上等待而不發送任何東西,close()將不會被調用。我通過每次客戶端連接時向pubsub發送一條無操作消息來解決此問題。這樣當瀏覽器進行正常的重新連接時,現在已經不存在的線程將嘗試寫入其關閉的連接,拋出異常,然後在服務器調用close()時清理乾淨。 SSE spec表示任何以冒號開頭的行都是應該忽略的註釋,所以我只是發送「:\ n」作爲我的無操作消息來清除陳舊的客戶端。

這裏是新的代碼。首先Django的看法:

def event_stream(request): 
    """ 
    Stream worker events out to browser. 
    """ 
    return events.SSEResponse(
     settings.EVENTS_PUBSUB_URL, 
     channels=[settings.EVENTS_PUBSUB_CHANNEL], 
     buffer_key=settings.EVENTS_BUFFER_KEY, 
     last_event_id=request.META.get('HTTP_LAST_EVENT_ID') 
    ) 

這做的工作,有一個輔助函數來格式化的SSE和一個HttpResponse子類,讓視圖是一個少許清潔劑沿Listener類:

class Listener(object): 
    def __init__(self, 
       rcon_or_url=settings.EVENTS_PUBSUB_URL, 
       channels=None, 
       buffer_key=settings.EVENTS_BUFFER_KEY, 
       last_event_id=None): 
     if isinstance(rcon_or_url, redis.StrictRedis): 
      self.rcon = rcon_or_url 
     elif isinstance(rcon_or_url, basestring): 
      self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url)) 
     if channels is None: 
      channels = [settings.EVENTS_PUBSUB_CHANNEL] 
     self.channels = channels 
     self.buffer_key = buffer_key 
     self.last_event_id = last_event_id 
     self.pubsub = self.rcon.pubsub() 
     self.pubsub.subscribe(channels) 

     # Send a superfluous message down the pubsub to flush out stale 
     # connections. 
     for channel in self.channels: 
      # Use buffer_key=None since these pings never need to be remembered 
      # and replayed. 
      sender = Sender(self.rcon, channel, None) 
      sender.publish('_flush', tags=['hidden']) 

    def __iter__(self): 
     # If we've been initted with a buffer key, then get all the events off 
     # that and spew them out before blocking on the pubsub. 
     if self.buffer_key: 
      buffered_events = self.rcon.lrange(self.buffer_key, 0, -1) 

      # check whether msg with last_event_id is still in buffer. If so, 
      # trim buffered_events to have only newer messages. 
      if self.last_event_id: 
       # Note that we're looping through most recent messages first, 
       # here 
       counter = 0 
       for msg in buffered_events: 
        if (json.loads(msg)['id'] == self.last_event_id): 
         break 
        counter += 1 
       buffered_events = buffered_events[:counter] 

      for msg in reversed(list(buffered_events)): 
       # Stream out oldest messages first 
       yield to_sse({'data': msg}) 

     for msg in self.pubsub.listen(): 
      if msg['type'] == 'message': 
       yield to_sse(msg) 

    def close(self): 
     self.pubsub.close() 
     self.rcon.connection_pool.disconnect() 


class SSEResponse(HttpResponse): 
    def __init__(self, rcon_or_url, channels, buffer_key=None, 
       last_event_id=None, *args, **kwargs): 
     self.listener = Listener(rcon_or_url, channels, buffer_key, 
           last_event_id) 
     super(SSEResponse, self).__init__(self.listener, 
              mimetype='text/event-stream', 
              *args, **kwargs) 

    def close(self): 
     """ 
     This will be called by the WSGI server at the end of the request, even 
     if the client disconnects midstream. Unless you're using Django's 
     runserver, in which case you should expect to see Redis connections 
     build up until http://bugs.python.org/issue16220 is fixed. 
     """ 
     self.listener.close() 


def to_sse(msg): 
    """ 
    Given a Redis pubsub message that was published by a Sender (ie, has a JSON 
    body with time, message, title, tags, and id), return a properly-formatted 
    SSE string. 
    """ 
    data = json.loads(msg['data']) 

    # According to the SSE spec, lines beginning with a colon should be 
    # ignored. We can use that as a way to force zombie listeners to try 
    # pushing something down the socket and clean up their redis connections 
    # when they get an error. 
    # See http://dev.w3.org/html5/eventsource/#event-stream-interpretation 
    if data['message'] == '_flush': 
     return ":\n" # Administering colonic! 

    if 'id' in data: 
     out = "id: " + data['id'] + '\n' 
    else: 
     out = '' 
    if 'name' in data: 
     out += 'name: ' + data['name'] + '\n' 

    payload = json.dumps({ 
     'time': data['time'], 
     'message': data['message'], 
     'tags': data['tags'], 
     'title': data['title'], 
    }) 
    out += 'data: ' + payload + '\n\n' 
    return out