2011-10-10 34 views
3

我有一些非常簡單的使用芹菜螺紋的週期代碼;它只是打印「前」和「後」,並在兩者之間休息。它是由this StackOverflow question調整和this linked website芹菜periodic_task並行運行多次

from celery.task import task 
from celery.task import periodic_task 
from django.core.cache import cache 
from time import sleep 
import main 
import cutout_score 
from threading import Lock 

import socket 
from datetime import timedelta 
from celery.decorators import task, periodic_task 

def single_instance_task(timeout): 
    def task_exc(func): 
    def wrapper(*args, **kwargs): 
     lock_id = "celery-single-instance-" + func.__name__ 
     acquire_lock = lambda: cache.add(lock_id, "true", timeout) 
     release_lock = lambda: cache.delete(lock_id) 
     if acquire_lock(): 
      try: 
       func() 
      finally: 
       release_lock() 
    return wrapper 
    return task_exc 

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes 
@periodic_task(run_every = timedelta(seconds=2)) 
def test(): 
    lock_id = "lock" 

    # cache.add fails if if the key already exists 
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE) 
    # memcache delete is very slow, but we have to use it to take 
    # advantage of using add() for atomic locking 
    release_lock = lambda: cache.delete(lock_id) 

    if acquire_lock(): 
     try: 
      print 'pre' 
      sleep(20) 
      print 'post' 
     finally: 
      release_lock() 
     return 
    print 'already in use...' 

此代碼從未打印'already in use...';當我使用@single_instance_task裝飾器時會發生同樣的現象。

你知道怎麼回事嗎?

編輯:我簡化了問題,以便它不寫入內存(使用全局或Django緩存);我還從來沒有看到'already in use...'


編輯:當我從https://docs.djangoproject.com/en/dev/topics/cache/一切添加以下代碼到我的Django的settings.py文件(通過改變代碼工作希望,但只有當我使用端口11211 (奇怪的是,我的服務器是在端口8000)

CACHES = { 
    'default': { 
     'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', 
     'LOCATION': [ 
      '127.0.0.1:11211' 
     ] 
    } 
} 

回答

3

你是如何運行的celeryd?我不熟悉的螺紋選項。

如果它正在運行多進程,則不存在工作人員之間共享內存的「全局」變量。

如果你想在所有工人之間共享一個計數器,那麼我建議你使用cache.incr

如:

In [1]: from django.core.cache import cache 

In [2]: cache.set('counter',0) 

In [3]: cache.incr('counter') 
Out[3]: 1 

In [4]: cache.incr('counter') 
Out[4]: 2 

更新

,如果你強迫你的任務在睡眠重疊會發生什麼,如:

print "Task on %r started" % (self,) 
sleep(20) 
print "Task on %r stopped" % (self,) 

如果你沒有得到「已經在使用...」從更頻繁地運行這20秒鐘ñ你知道緩存不符合預期。


另一個更新

你有設置在你的Django設置的緩存後端?例如。如果沒有,你可以使用Dummy Cache它實際上並沒有做任何緩存memcached的

,只是實現的接口 ...這聽起來像你的問題的一個有說服力的原因。

+0

+1這聽起來像是與我的問題有關。我嘗試過使用緩存,但仍然看到'計數器'的不穩定值。另外,我看到有多名工作人員進入「測試」功能。我正在用django運行芹菜:'python管理。py celeryd -v 2 -B -s celery -E -l INFO' – user

+0

即使我簡化以至於'test'函數只是打印出「hello」,它就會在不同的工作人員上運行並且打印得太頻繁(即使我有'@ single_instance_task'裝飾器定義)。 – user

+0

我簡化了代碼(上面),以便它只打印(如您所建議的那樣)。它仍然不會打印「已經在使用中......」;不知怎的,緩存沒有成功鎖定。 – user