2015-05-10 55 views
2

我想通過以下這個簡單的配方來實現我的瓶的應用程序服務器發送的事件:http://flask.pocoo.org/snippets/116/gunicorn與GEVENT工人:使用一個共享的全局列表

服務於應用程序,我用gunicorn與GEVENT工人。

我的代碼的最低版本是這樣的:

import multiprocessing 

from gevent.queue import Queue 
from gunicorn.app.base import BaseApplication 
from flask import Flask, Response 

app = Flask('minimal') 
# NOTE: This is the global list of subscribers 
subscriptions = [] 


class ServerSentEvent(object): 
    def __init__(self, data): 
     self.data = data 
     self.event = None 
     self.id = None 
     self.desc_map = { 
      self.data: "data", 
      self.event: "event", 
      self.id: "id" 
     } 

    def encode(self): 
     if not self.data: 
      return "" 
     lines = ["%s: %s" % (v, k) 
       for k, v in self.desc_map.iteritems() if k] 
     return "%s\n\n" % "\n".join(lines) 


@app.route('/api/events') 
def subscribe_events(): 
    def gen(): 
     q = Queue() 
     print "New subscription!" 
     subscriptions.append(q) 
     print len(subscriptions) 
     print id(subscriptions) 
     try: 
      while True: 
       print "Waiting for data" 
       result = q.get() 
       print "Got data: " + result 
       ev = ServerSentEvent(unicode(result)) 
       yield ev.encode() 
     except GeneratorExit: 
      print "Removing subscription" 
      subscriptions.remove(q) 
    return Response(gen(), mimetype="text/event-stream") 


@app.route('/api/test') 
def push_event(): 
    print len(subscriptions) 
    print id(subscriptions) 
    for sub in subscriptions: 
     sub.put("test") 
    return "OK" 


class GunicornApplication(BaseApplication): 
    def __init__(self, wsgi_app, port=5000): 
     self.options = { 
      'bind': "0.0.0.0:{port}".format(port=port), 
      'workers': multiprocessing.cpu_count() + 1, 
      'worker_class': 'gevent', 
      'preload_app': True, 
     } 
     self.application = wsgi_app 
     super(GunicornApplication, self).__init__() 

    def load_config(self): 
     config = dict([(key, value) for key, value in self.options.iteritems() 
         if key in self.cfg.settings and value is not None]) 
     for key, value in config.iteritems(): 
      self.cfg.set(key.lower(), value) 

    def load(self): 
     return self.application 


if __name__ == '__main__': 
    gapp = GunicornApplication(app) 
    gapp.run() 

的問題是,用戶的名單,似乎是每一個工人不同。這意味着如果工作人員#1處理端點/api/events並向列表添加新訂戶,則客戶端將只接收在工作人員1還處理端點時添加的事件。

奇怪的是,實際的列表對象似乎是相同的每個工人,因爲id(subscriptions)返回每個工人相同的值。

有沒有辦法解決這個問題?我知道我可以使用Redis,但應用程序應該儘可能獨立,所以我試圖避免任何外部服務。

更新: 問題的原因似乎是在我的 gunicorn.app.base.BaseApplication的嵌入(這是一個 new feature in v0.19)。當 gunicorn -k gevent minimal:app運行在命令行應用程序,一切正常

更新2:先前的懷疑被證明是錯誤的,只是它的工作的原因是因爲gunicorn的默認工作進程的數目爲1當通過-w參數調整編號以符合代碼時,它表現出相同的行爲。

回答

1

你說:

實際列表對象似乎是每個工人一樣,因爲 ID(訂閱)返回每個工人相同的值。

但我認爲這不是真的,每個工人的subscriptions不是同一個對象。每個工人都是一個獨立的進程,擁有自己的內存空間。

對於自包含系統,您可以開發一個功能類似Redis的簡單版本的微型系統。例如,使用SQLite或ZeroMQ在這些工作人員之間進行通信。

相關問題