芹菜

2013-05-28 146 views
6

SQLAlchemy的會議有關的問題我已經計劃以芹菜拍一些重複的任務對我們的web應用程序芹菜

軟件本身是建立使用金字塔web框架。使用zopetransaction擴展來管理會話

在芹菜中,我使用該應用程序作爲庫。我正在使用函數重新定義模型中的會話。

它運作良好,但過一段時間,它會引發InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction

我不知道什麼是錯的,爲什麼它會發出這些警告。

樣品的編號:

tasks.py

def initialize_async_session(): 
    import sqlalchemy 
    from webapp.models import Base, set_dbsession, engine 

    Session = sqlalchemy.orm.scoped_session(
       sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True) 
          ) 
    Session.configure(bind=engine) 
    session = Session() 

    set_dbsession(session) 
    Base.metadata.bind = engine 
    return session 


@celery.task 
def rerun_scheduler(): 
    log.info("Starting pipeline scheduler") 
    session = initialize_async_session() 
    webapp.sheduledtask.service.check_for_updates(session) 
    log.info("Ending pipeline scheduler") 

models.py在web應用

DBSession = scoped_session(sessionmaker(bind=engine, expire_on_commit=False, 
        extension=ZopeTransactionExtension())) 

def set_dbsession(db_session=None): 
    """ 
    This function sets the db session 
    """ 
    global DBSession 
    if db_session: 
     DBSession = db_session 
     log.info("session changed to {0}".format(db_session)) 

UPDATE:

回溯:

Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function 
    result = f(*args, **kwargs) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function 
    result = f(*args, **kwargs) 
    File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run 
    self.table_params.set_task_status_as_finished() 
    File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished 
    task = Task.get_by_id(self.task_id) 
    File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/models.py", line 162, in get_by_id 
    return DBSession.query(cls).filter(cls.id == obj_id).first() 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2156, in first 
    ret = list(self[0:1]) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2023, in __getitem__ 
    return list(res) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2227, in __iter__ 
    return self._execute_and_instances(context) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2240, in _execute_and_instances 
    close_with_result=True) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2231, in _connection_from_session 
    **kw) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 777, in connection 
    close_with_result=close_with_result) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 781, in _connection_for_bind 
    return self.transaction._connection_for_bind(engine) 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 289, in _connection_for_bind 
    self._assert_is_active() 
    File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 217, in _assert_is_active 
    "This Session's transaction has been rolled back " 
InvalidRequestError: This Session's transaction has been rolled back by a nested rollback() call. To begin a new transaction, issue Session.rollback() first. 

######################################################################### 

[2013-05-30 14:32:57,782: WARNING/PoolWorker-3] Exception in thread Thread-4: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function 
    result = f(*args, **kwargs) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function 
    result = f(*args, **kwargs) 
    File "/home/ranjith/wksp/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run 
    self.table_params.set_task_status_as_finished() 
    File "/home/ranjith/wksp/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished 
    task = Task.get_by_id(self.task_id) 
    File "/home/ranjith/wksp/mvc-service/webapp/webapp/models.py", line 166, in get_by_id 
    return DBSession.query(cls).filter(cls.id == obj_id).first() 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2145, in first 
    ret = list(self[0:1]) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2012, in __getitem__ 
    return list(res) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2216, in __iter__ 
    return self._execute_and_instances(context) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2229, in _execute_and_instances 
    close_with_result=True) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2220, in _connection_from_session 
    **kw) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 798, in connection 
    close_with_result=close_with_result) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 802, in _connection_for_bind 
    return self.transaction._connection_for_bind(engine) 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 281, in _connection_for_bind 
    self._assert_active() 
    File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 181, in _assert_active 
    "This session is in 'prepared' state; no further " 
InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction. 
+0

請張貼你得異常完整的堆棧跟蹤。 – vvladymyrov

+0

你用什麼樣的數據庫系統來存儲數據? – vvladymyrov

+0

@vvladymyrov我使用mysql.pasted traceback –

回答

10

我認爲問題在於您正在嘗試在Celery任務中使用SQLAlchemy會話。

我推薦的第一件事是創建兩個單獨的作用域會話,一個用於Celery應用程序,另一個用於Web應用程序。接下來,我將確保Celery數據庫會話僅在Celery初始化期間配置一次。您可以使用Celery worker_init.connect確保它在Celery啓動期間創建數據庫(http://hynek.me/articles/using-celery-with-pyramid/)。

您的Web應用程序不使用與Celery應用程序相同的數據庫會話是非常重要的。

像這樣的東西你tasks.py文件:

from celery import Celery 
from celery.signals import worker_init 
from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker 

Session = sqlalchemy.orm.scoped_session(
    sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True)) 


@worker_init.connect 
def initialize_session(): 
    some_engine = create_engine('database_url')  
    Session.configure(bind=some_engine) 

@celery.task 
def rerun_scheduler(): 
    log.info("Starting pipeline scheduler") 
    webapp.sheduledtask.service.check_for_updates(Session) 
    log.info("Ending pipeline scheduler") 
+0

問題是,在'webapp.sheduledtask.service.check_for_updates'函數內使用了一些依賴於Session的函數。而且他們需要在模型級別設置全局會話。 –