我在我的Django應用程序中實現了一個Server Sent Event API,以便將來自後端的實時更新流傳輸到瀏覽器。後端是Redis pubsub。我的Django的看法是這樣的:Django:清理客戶端與流斷開連接後的redis連接
def event_stream(request):
"""
Stream worker events out to browser.
"""
listener = events.Listener(
settings.EVENTS_PUBSUB_URL,
channels=[settings.EVENTS_PUBSUB_CHANNEL],
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
)
return http.HttpResponse(listener, mimetype='text/event-stream')
而且我要回作爲一個迭代的events.Listener類看起來是這樣的:
class Listener(object):
def __init__(self, rcon_or_url, channels, buffer_key=None,
last_event_id=None):
if isinstance(rcon_or_url, redis.StrictRedis):
self.rcon = rcon_or_url
elif isinstance(rcon_or_url, basestring):
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
self.channels = channels
self.buffer_key = buffer_key
self.last_event_id = last_event_id
self.pubsub = self.rcon.pubsub()
self.pubsub.subscribe(channels)
def __iter__(self):
# If we've been initted with a buffer key, then get all the events off
# that and spew them out before blocking on the pubsub.
if self.buffer_key:
buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)
# check whether msg with last_event_id is still in buffer. If so,
# trim buffered_events to have only newer messages.
if self.last_event_id:
# Note that we're looping through most recent messages first,
# here
counter = 0
for msg in buffered_events:
if (json.loads(msg)['id'] == self.last_event_id):
break
counter += 1
buffered_events = buffered_events[:counter]
for msg in reversed(list(buffered_events)):
# Stream out oldest messages first
yield to_sse({'data': msg})
try:
for msg in self.pubsub.listen():
if msg['type'] == 'message':
yield to_sse(msg)
finally:
logging.info('Closing pubsub')
self.pubsub.close()
self.rcon.connection_pool.disconnect()
我能夠順利流事件出來的瀏覽器與此設置。但是,似乎聽者的「終於」中的斷開連接調用並沒有真正被調用。我假設他們仍在等待消息來自pubsub。隨着客戶端斷開連接並重新連接,我可以看到我的Redis實例的連接數量從未下降。一旦達到1000左右,Redis就會開始瘋狂並消耗所有可用的CPU。
我希望能夠檢測客戶端何時不再偵聽並關閉Redis連接。
事情我已經嘗試過或者想過:
- 連接池。但是,redis-py自述文件指出:「在線程之間傳遞PubSub或Pipeline對象是不安全的。」
- 處理連接或中斷的中間件。這不起作用,因爲中間件的process_response()方法被調用得太早(甚至在http頭被髮送到客戶端之前)。我需要在客戶端斷開連接時調用某些內容,而我正在向他們傳輸內容。
- request_finished和got_request_exception信號。第一個,像中間件中的process_response(),似乎太快了。當客戶端中斷連接時,第二個不會被調用。
最終的皺紋:在生產中我使用了Gevent,所以我可以避免一次打開很多連接。但是,無論我使用普通的'manage.py runserver'還是Gevent monkeypatched runserver或Gunicorn的gevent工作人員,都會發生此連接泄漏問題。