2016-03-04 48 views
0

我正在構建一個依賴於芹菜來處理一些長時間運行的任務的Flask應用程序。一旦完成處理,每個任務將基本上將一個字典追加到共享列表中 - 該列表由芹菜工作者和Flask應用程序的路由共享。 Flask組件基本上由一組路徑組成,用於檢索共享列表的內容並修改元素的順序。芹菜工和共享列表使用python的多處理

我瘦我已經成功地使用Python的多處理模塊中的管理器共享芹菜工人之間的列表。但是,Flask應用程序無法看到對此列表所做的更改。下面是一個最小的應用程序,它說明了問題:

import os 
import json 

from flask import Flask 
from multiprocessing import Manager 
from celery import Celery 

application = Flask(__name__) 

redis_url = os.environ.get('REDIS_URL') 
if redis_url is None: 
    redis_url = 'redis://localhost:6379/0' 

# Set the secret key to enable cookies 
application.secret_key = 'some secret key' 
application.config['SESSION_TYPE'] = 'filesystem' 

# Redis and Celery configuration 
application.config['BROKER_URL'] = redis_url 
application.config['CELERY_RESULT_BACKEND'] = redis_url 

celery = Celery(application.name, broker=redis_url) 
celery.conf.update(BROKER_URL=redis_url, 
       CELERY_RESULT_BACKEND=redis_url) 

manager = Manager() 
shared_queue = manager.list() # THIS IS THE SHARED LIST 

@application.route("/submit", methods=['GET']) 
def submit_song(): 
    add_song_to_queue.delay() 
    return 'Added a song to the queue' 

@application.route("/playlist", methods=['GET', 'POST']) 
def get_playlist(): 
    playlist = [] 
    i = 0 
    queue_size = len(shared_queue) 
    while i < queue_size: 
     print(shared_queue[i]) 
     playlist.append(shared_queue[i]) 
    return json.dumps(playlist) 

@celery.task 
def add_song_to_queue(): 
    shared_queue.append({'some':'data!'}) 
    print(len(shared_queue)) 

if __name__ == "__main__": 
    application.run(host='0.0.0.0', debug=True) 

在芹菜日誌,我可以清楚地看到字典被添加到列表中,而列表中的大小增加。但是,當我在瀏覽器上訪問/播放列表路由時,我總是會得到一個空列表。

有誰知道我可以如何讓列表在所有工作人員和Flask應用程序之間共享?

+2

我相信你有兩個Python解釋運行,所以你不能。您可以將播放列表存儲在數據庫中。這將是最好的選擇任何方式。 –

+0

@JoeDoherty我從來沒有考慮過我使用兩個單獨的解釋器。我真的想避免使用數據庫。您是否認爲將Celery for Process(target = add_song_to_queue)調用換出可能有效?這就意味着我只會使用一位口譯員。 – macalaca

+1

我想它會工作,但我不能推薦它。 Python不太適合多線程應用程序。我們通常採用更多流程進行擴展你對應用有多少流量?您可以使用像Redis這樣的簡單KV商店,而不是完整的RDBMS。 –

回答

1

我發現了一個解決方案,通過從Celery移開,並使用multiprocessing.Pool作爲任務隊列和通過Manager共享內存,如示例代碼中所示。此鏈接有如何的解決方案可以與瓶集成一個很好的例子:http://gouthamanbalaraman.com/blog/python-multiprocessing-as-a-task-queue.html

from multiprocessing import Pool 
from flask import Flask 

app = Flask(__name__) 
_pool = None 

def expensive_function(x): 
     # import packages that is used in this function 
     # do your expensive time consuming process 
     return x*x 

@app.route('/expensive_calc/<int:x>') 
def route_expcalc(x): 
     f = _pool.apply_async(expensive_function,[x]) 
     r = f.get(timeout=2) 
     return 'Result is %d'%r 

if __name__=='__main__': 
     _pool = Pool(processes=4) 
     try: 
       # insert production server deployment code 
       app.run() 
     except KeyboardInterrupt: 
       _pool.close() 
       _pool.join()