我有一些非常簡單的使用芹菜螺紋的週期代碼;它只是打印「前」和「後」,並在兩者之間休息。它是由this StackOverflow question調整和this linked website芹菜periodic_task並行運行多次
from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
def single_instance_task(timeout):
def task_exc(func):
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func()
finally:
release_lock()
return wrapper
return task_exc
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
lock_id = "lock"
# cache.add fails if if the key already exists
acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
print 'pre'
sleep(20)
print 'post'
finally:
release_lock()
return
print 'already in use...'
此代碼從未打印'already in use...'
;當我使用@single_instance_task
裝飾器時會發生同樣的現象。
你知道怎麼回事嗎?
編輯:我簡化了問題,以便它不寫入內存(使用全局或Django緩存);我還從來沒有看到'already in use...'
編輯:當我從https://docs.djangoproject.com/en/dev/topics/cache/一切添加以下代碼到我的Django的settings.py文件(通過改變代碼工作希望,但只有當我使用端口11211 (奇怪的是,我的服務器是在端口8000)
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': [
'127.0.0.1:11211'
]
}
}
+1這聽起來像是與我的問題有關。我嘗試過使用緩存,但仍然看到'計數器'的不穩定值。另外,我看到有多名工作人員進入「測試」功能。我正在用django運行芹菜:'python管理。py celeryd -v 2 -B -s celery -E -l INFO' – user
即使我簡化以至於'test'函數只是打印出「hello」,它就會在不同的工作人員上運行並且打印得太頻繁(即使我有'@ single_instance_task'裝飾器定義)。 – user
我簡化了代碼(上面),以便它只打印(如您所建議的那樣)。它仍然不會打印「已經在使用中......」;不知怎的,緩存沒有成功鎖定。 – user