2014-10-20 45 views
7

當我的一個單元測試刪除SQLAlchemy對象時,該對象觸發after_delete事件,該事件觸發Celery任務從驅動器中刪除文件。SQLAlchemy事件觸發Celery任務時關閉連接

測試時的任務是CELERY_ALWAYS_EAGER = True

gist to reproduce the issue easily

的例子有兩個測試。一個在事件中觸發任務,另一個在事件外觸發。只有事件中的一個關閉連接。

快速重現,你可以運行錯誤:

git clone https://gist.github.com/5762792fc1d628843697.git 
cd 5762792fc1d628843697 
virtualenv venv 
. venv/bin/activate 
pip install -r requirements.txt 
python test.py 

堆棧:

$  python test.py 
E 
====================================================================== 
ERROR: test_delete_task (__main__.CeleryTestCase) 
---------------------------------------------------------------------- 
Traceback (most recent call last): 
    File "test.py", line 73, in test_delete_task 
    db.session.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do 
    return getattr(self.registry(), name)(*args, **kwargs) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit 
    self.transaction.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit 
    self._prepare_impl() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl 
    self.session.flush() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush 
    self._flush(objects) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__ 
    compat.reraise(type_, value, traceback) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback 
    self._assert_active(prepared_ok=True, rollback_ok=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active 
    raise sa_exc.ResourceClosedError(closed_msg) 
ResourceClosedError: This transaction is closed 

---------------------------------------------------------------------- 
Ran 1 test in 0.014s 

FAILED (errors=1) 
+0

請發佈錯誤和堆棧跟蹤。 – ACV 2014-10-21 23:28:18

+0

我剛剛更新了測試輸出。我還添加了在本地* nix機器上輕鬆複製的命令。 – deBrice 2014-10-22 00:55:35

回答

8

我想我找到了問題 - 這是你如何設置你的芹菜任務。如果您從您的芹菜安裝應用情境呼叫,樣樣精運行:

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     # deleted --> with app.app_context(): 
     return TaskBase.__call__(self, *args, **kwargs) 

有一個在SQLAlchemy的文檔有關after_delete事件中從未修改會議一大警告:http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

所以我懷疑with app.app_context():在刪除期間被調用,試圖附加到和/或修改Flask-SQLAlchemy存儲在app對象中的會話,因此整個事情都在轟炸。

Flask-SQlAlchemy在幕後爲你做了很多魔術,但你可以繞過這個並直接使用SQLAlchemy。如果您需要刪除活動期間交談的數據庫,你可以創建一個新的會話到DB:

@celery.task() 
def my_task(): 
    # obviously here I create a new object 
    session = db.create_scoped_session() 
    session.add(User(id=13, value="random string")) 
    session.commit() 
    return 

但它聽起來像你不需要這些,你只是想刪除的圖像路徑。在這種情況下,我只想改變你的任務,所以它需要一個路徑:

# instance will call the task 
@event.listens_for(User, "after_delete") 
def after_delete(mapper, connection, target): 
    my_task.delay(target.value) 

@celery.task() 
def my_task(image_path): 
    os.remove(image_path) 

希望這是有幫助 - 讓我知道如果有任何不爲你工作。感謝非常詳細的設置,它真的幫助調試。

+0

你釘了它,重新定義在應用上下文中運行的任務是個問題。我會看看是否可以在現有的'celery.task'之外定義'celery.task_with_context'裝飾器。非常感謝! – deBrice 2014-10-22 22:03:52

+0

完全沒問題!很高興幫助。 – 2014-10-22 22:40:51

+1

呵呵@RachelSanders,想念你在這裏遇到你。 – ACV 2014-10-23 14:33:06

0

問,芹菜的創造者,建議,解決方案on github

from celery import signals 

def make_celery(app): 
    ... 

    @signals.task_prerun.connect 
    def add_task_flask_context(sender, **kwargs): 
     if not sender.request.is_eager: 
      sender.request.flask_context = app.app_context().__enter__() 

    @signals.task_postrun.connect 
    def cleanup_task_flask_context(sender, **kwargs): 
     flask_context = getattr(sender.request, 'flask_context', None) 
     if flask_context is not None: 
      flask_context.__exit__(None, None, None) 
1

由deBrice提出的答案相似,但使用類似雷切爾的辦法。

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     import flask 
     # tests will be run in unittest app context 
     if flask.current_app: 
      return TaskBase.__call__(self, *args, **kwargs) 
     else: 
      # actual workers need to enter worker app context 
      with app.app_context(): 
       return TaskBase.__call__(self, *args, **kwargs) 
相關問題