2015-06-03 37 views
0

我試圖儘可能快地處理整個csv文件,所以我正在尋找並行處理每一行作爲芹菜任務。清理也是一項芹菜任務,必須等到每一行都被處理完畢。看下面的例子。MySQL在芹菜任務期間不斷丟失連接

問題是,我似乎無法通過一個文件,因爲我一直運行到與MySQL連接錯誤。到目前爲止,我已經看到了這兩個錯誤:2013, 'Lost connection to MySQL server during query'2006, 'MySQL server has gone away'

from app.db.meta import Session 
from celery import chord, Celery 
from celery.signals import task_postrun 

celery = Celery() 
celery.config_from_object('config') 

@task_postrun.connect 
def close_session(*args, **kwargs): 
    Session.remove() 

def main(): 
    # process each line in parallel 
    header = [process_line.s(line) for line in csv_file] 
    # pass stats to cleanup after all lines are processed 
    callback = cleanup.s() 
    chord(header)(callback) 

@celery.task 
def process_line(line): 
    session = Session() 
    ... 
    # process line 
    ... 
    return stats 

@celery.task 
def cleanup(stats): 
    session = Session() 
    ... 
    # do cleanup and log stats 
    ... 

我用芹菜3.1.18和0.9.9 SQLAlchemy的。我也在使用連接池。

mysql> SHOW FULL PROCESSLIST;                 
+----+------+-----------+-----------------+---------+------+-------+-----------------------+ 
| Id | User | Host  | db    | Command | Time | State   | Info    | 
+----+------+-----------+-----------------+---------+------+-------+-----------------------+       
| 1 | root | localhost | ab__development | Sleep | 4987 |  | NULL     |       
| 11 | root | localhost | ab__development | Sleep | 1936 |  | NULL     |       
| 16 | root | localhost | ab__development | Sleep | 143 |  | NULL     |       
| 17 | root | localhost | ab__development | Sleep | 1045 |  | NULL     |       
| 18 | root | localhost | NULL   | Query | 0 | init | SHOW FULL PROCESSLIST |            
| 21 | root | localhost | ab__development | Sleep | 7 |  | NULL     |       
+----+------+-----------+-----------------+---------+------+-------+-----------------------+       
6 rows in set (0.01 sec)                  
+0

沒有價值'max_connection'設置,所以我假設的100 – BDuelz

+0

'MAX_CONNECTIONS默認= 151' – BDuelz

+0

我不能複製'show processlist'的整個輸出。但是,我看到6行 - 所有相同的用戶和主機。其中5個具有相同的db(應用程序數據庫),而另一個則爲NULL。其中5個表示Sleep作爲命令,另一個表示查詢。其中5個具有大的時間值,而另一個具有0. – BDuelz

回答

0

Read the answer。總之,你必須要麼禁用SQLAlchemy's Pool engine或嘗試ping MySQL服務器:

from flask.ext.sqlalchemy import SQLAlchemy 
from sqlalchemy import event, exc 


def instance(app): 
    """:rtype: SQLAlchemy""" 
    db = SQLAlchemy(app) 

    if app.testing: 
     return db 

    @event.listens_for(db.engine, 'checkout') 
    def checkout(dbapi_con, con_record, con_proxy): 
     try: 
      try: 
       dbapi_con.ping(False) 
      except TypeError: 
       app.logger.debug('MySQL connection died. Restoring...') 
       dbapi_con.ping() 
     except dbapi_con.OperationalError as e: 
      app.logger.warning(e) 
      if e.args[0] in (2006, 2013, 2014, 2045, 2055): 
       raise exc.DisconnectionError() 
      else: 
       raise 

    return db