2015-07-19 37 views
6

我在Celery任務中使用Scrapy解析函數(有時可能需要10分鐘)才能獲得此信息。InterfaceError:連接已關閉(使用django +芹菜+ Scrapy)

我使用: - Django的== 1.6.5 - Django的芹菜== 3.1.16 - 芹菜== 3.1.16 - psycopg2 == 2.5.5(I也用psycopg2 == 2.5。 4)

 
[2015-07-19 11:27:49,488: CRITICAL/MainProcess] Task myapp.parse_items[63fc40eb-c0d6-46f4-a64e-acce8301d29a] INTERNAL ERROR: InterfaceError('connection already closed',) 
Traceback (most recent call last): 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/app/trace.py", line 284, in trace_task 
    uuid, retval, SUCCESS, request=task_request, 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/celery/backends/base.py", line 248, in store_result 
    request=request, **kwargs) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/backends/database.py", line 29, in _store_result 
    traceback=traceback, children=self.current_task_children(request), 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 42, in _inner 
    return fun(*args, **kwargs) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 181, in store_result 
    'meta': {'children': children}}) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 87, in update_or_create 
    return get_queryset(self).update_or_create(**kwargs) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/djcelery/managers.py", line 70, in update_or_create 
    obj, created = self.get_or_create(**kwargs) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 376, in get_or_create 
    return self.get(**lookup), False 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 304, in get 
    num = len(clone) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 77, in __len__ 
    self._fetch_all() 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 857, in _fetch_all 
    self._result_cache = list(self.iterator()) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/query.py", line 220, in iterator 
    for row in compiler.results_iter(): 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 713, in results_iter 
    for rows in self.execute_sql(MULTI): 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 785, in execute_sql 
    cursor = self.connection.cursor() 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 160, in cursor 
    cursor = self.make_debug_cursor(self._cursor()) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 134, in _cursor 
    return self.create_cursor() 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/utils.py", line 99, in __exit__ 
    six.reraise(dj_exc_type, dj_exc_value, traceback) 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/__init__.py", line 134, in _cursor 
    return self.create_cursor() 
    File "/home/mo/Work/python/pb-env/local/lib/python2.7/site-packages/django/db/backends/postgresql_psycopg2/base.py", line 137, in create_cursor 
    cursor = self.connection.cursor() 
InterfaceError: connection already closed 
+0

你能顯示有問題的代碼嗎? –

回答

5

Unfortunately this is a problem with django + psycopg2 + celery combo. It's an old and unsolved problem.

Take a look on this thread to understand: https://github.com/celery/django-celery/issues/121

Basically, when celery starts a worker, it forks a database connection from django.db framework. If this connection drops for some reason, it doesn't create a new one. Celery has nothing to do with this problem once there is no way to detect when the database connection is dropped using django.db libraries. Django doesn't notifies when it happens, because it just start a connection and it receives a wsgi call (no connection pool). I had the same problem on a huge production environment with a lot of machine workers, and sometimes, these machines lost connectivity with postgres server.

I solved it putting each celery master process under a linux supervisord handler and a watcher and implemented a decorator that handles the psycopg2.InterfaceError, and when it happens this function dispatches a syscall to force supervisor restart gracefully with SIGINT the celery process.

編輯:

找到一個更好的解決方案。我實現了這樣的芹菜任務基類:

from django.db import connection 
import celery 

class FaultTolerantTask(celery.Task): 
    """ Implements after return hook to close the invalid connection. 
    This way, django is forced to serve a new connection for the next 
    task. 
    """ 
    abstract = True 

    def after_return(self, *args, **kwargs): 
     connection.close() 

@celery.task(base=FaultTolerantTask) 
def my_task(): 
    # my database dependent code here 

我相信它也能解決你的問題。

+0

嗨emanuelcds,你能分享一個例子嗎?面臨同樣的問題,並有助於看到示例代碼。謝謝 –

+0

昨天我得到了一個更好的解決方案。我會執行並檢查它是否正常工作。一旦它的工作,我會編輯這個答案,並讓你知道。但基本上,您可以通過使用'base'參數o @ app.task在芹菜任務上使用基類。如果失敗,我會實現一些重新啓動數據庫連接。我會保持你在這裏發佈。 – emanuelcds

+1

我看到從使用Django ORM訪問數據庫(Django == 1.9.5,psycopg2 == 2.6.1)的守護進程python腳本完全相同的異常。如果PostgreSQL在守護進程運行時重新啓動(或由於某些其他原因導致數據庫連接失效),則不會重新創建數據庫連接。有可能捕獲每個可能的db異常並強制重置連接,但這是一個很大的破解:http://stackoverflow.com/questions/4447497。我很想看到一個解決這個問題的通用解決方案。 – Noky

1

男生和emanuelcds

我有同樣的問題,我現在已經更新了我的代碼,並創造了芹菜新裝載機:如果您使用djcelery

from djcelery.loaders import DjangoLoader 
from django import db 

class CustomDjangoLoader(DjangoLoader): 
    def on_task_init(self, task_id, task): 
     """Called before every task.""" 
     for conn in db.connections.all(): 
      conn.close_if_unusable_or_obsolete() 
     super(CustomDjangoLoader, self).on_task_init(task_id, task) 

這當然,它會還需要在設置是這樣的:

CELERY_LOADER = 'myproject.loaders.CustomDjangoLoader' 
os.environ['CELERY_LOADER'] = CELERY_LOADER 

我還是要測試它,我會更新。

+0

訣竅在'close_if_unusable_or_obsolete'調用上。這非常有用!謝謝@PaoloC。你的答案應該被認爲是正確的。 – emanuelcds