是否有一個標準的方法去除Celery任務?去抖芹菜任務?
例如,這樣一個任務可以是「開始」多次,但僅在一些延遲後運行一次:
def debounce_task(task):
if task_is_queued(task):
return
task.apply_async(countdown=30)
是否有一個標準的方法去除Celery任務?去抖芹菜任務?
例如,這樣一個任務可以是「開始」多次,但僅在一些延遲後運行一次:
def debounce_task(task):
if task_is_queued(task):
return
task.apply_async(countdown=30)
下面是我們如何使用Redis的櫃檯做。所有這些可能都可以在裝飾器中推廣,但我們只將它用於特定任務(webhooks)
您的公共任務是您從其他功能調用的任務。它需要在Redis中增加一個密鑰。關鍵是你的函數的參數,不管他們可能是形成(這確保了計數器是衆多個人的任務是唯一的)
@task
def your_public_task(*args, **kwargs):
cache_key = make_public_task_cache_key(*args, **kwargs)
get_redis().incr(cache_key)
_your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY)
注緩存鍵功能共享(你想在每一個功能相同的緩存鍵)和countdown
設置。
然後,執行代碼的實際任務執行以下操作:
@task
def _your_task(*args, **kwargs):
cache_key = make_public_task_cache_key(*args, **kwargs)
counter = get_redis().getset(cache_key, 0)
# redis makes the zero a string.
if counter == '0':
return
... execute your actual task code.
這可以讓你打your_public_task.delay(..)
多次,只要你想,你QUEUE_DELAY
內,它會只火了一次。
下面是你如何使用Mongo來做到這一點。
注:我必須讓設計更寬容一點,因爲芹菜任務不能保證執行eta
或countdown
用完的確切時刻。
此外,Mongo過期索引只是每分鐘左右清理一次;因此,在eta
啓動的時刻,您無法圍繞被刪除的記錄進行設計。
總之,流動是這樣的:
my_task
。preflight
遞增呼叫計數器,並且如flight_id
_my_task
設置TTL
秒之後執行返回。_my_task
運行時,它會檢查它是否仍然是最新的flight_id
。如果不是,它會中止。@celery.task(track_started=False, ignore_result=True)
def my_task(my_arg):
flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL)
_my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL)
@celery.task(track_started=False, ignore_result=True)
def _my_task(my_arg, flight_id=None):
if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id):
return
# ... actual work ... #
庫代碼:
TTL = 5 * 60 # Run tasks after 5 minutes
EXPIRY = 6 * TTL # This needs to be much larger than TTL.
# We need to store a list of task-executions currently pending
inflight_collection = db['celery_In_Flight']
inflight_collection.create_index([('fn', pymongo.ASCENDING,),
('key', pymongo.ASCENDING,)])
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY)
def preflight(collection, fn, key, ttl):
eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
result = collection.find_one_and_update({
'fn': fn,
'key': key,
}, {
'$set': {
'eta': eta
},
'$inc': {
'flightId': 1
}
}, upsert=True, return_document=pymongo.ReturnDocument.AFTER)
print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId'])
return result['flightId']
def check_for_takeoff(collection, fn, key, flight_id):
result = collection.find_one({
'fn': fn,
'key': key
})
ready = result is None or result['flightId'] == flight_id
print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready)
return ready
鮑爾泰克有想法,使用redis計數器是原子的(如果你的代理是redis,應該很容易獲得)。雖然他的解決方案是打擊,而不是反彈。雖然差異很小(getset vs decr)。
排隊的任務:
conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
然後在任務:
conn = get_redis()
counter = conn.decr(key)
if counter > 0:
# task is still queued
return
# continue on to rest of task
很難使它成爲一個裝飾,因爲你需要裝飾的任務和調用任務本身。所以你需要一個裝飾器在芹菜@task裝飾器之前和之後。
現在我只是提出了一些幫助我調用任務的函數,以及一個在任務開始時進行檢查的函數。
http://stackoverflow.com/a/43625455/4391298是我終於得到的解決方案,包括一些關鍵失效處理它融化(不是原來的解決方案中的問題)。 – 2017-04-26 05:06:42
這是我想出了一個解決方案:https://gist.github.com/wolever/3cf2305613052f3810a271e09d42e35c
在這裏複製,爲後人:
import time
import redis
def get_redis_connection():
return redis.connect()
class TaskDebouncer(object):
""" A simple Celery task debouncer.
Usage::
def debounce_process_corpus(corpus):
# Only one task with ``key`` will be allowed to execute at a
# time. For example, if the task was resizing an image, the key
# might be the image's URL.
key = "process_corpus:%s" %(corpus.id,)
TaskDebouncer.delay(
key, my_taks, args=[corpus.id], countdown=0,
)
@task(bind=True)
def process_corpus(self, corpus_id, debounce_key=None):
debounce = TaskDebouncer(debounce_key, keepalive=30)
corpus = Corpus.load(corpus_id)
try:
for item in corpus:
item.process()
# If ``debounce.keepalive()`` isn't called every
# ``keepalive`` interval (the ``keepalive=30`` in the
# call to ``TaskDebouncer(...)``) the task will be
# considered dead and another one will be allowed to
# start.
debounce.keepalive()
finally:
# ``finalize()`` will mark the task as complete and allow
# subsequent tasks to execute. If it returns true, there
# was another attempt to start a task with the same key
# while this task was running. Depending on your business
# logic, this might indicate that the task should be
# retried.
needs_retry = debounce.finalize()
if needs_retry:
raise self.retry(max_retries=None)
"""
def __init__(self, key, keepalive=60):
if key:
self.key = key.partition("!")[0]
self.run_key = key
else:
self.key = None
self.run_key = None
self._keepalive = keepalive
self.cxn = get_redis_connection()
self.init()
self.keepalive()
@classmethod
def delay(cls, key, task, args=None, kwargs=None, countdown=30):
cxn = get_redis_connection()
now = int(time.time())
first = cxn.set(key, now, nx=True, ex=countdown + 10)
if not first:
now = cxn.get(key)
run_key = "%s!%s" %(key, now)
if first:
kwargs = dict(kwargs or {})
kwargs["debounce_key"] = run_key
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
return (first, run_key)
def init(self):
self.initial = self.key and self.cxn.get(self.key)
def keepalive(self, expire=None):
if self.key is None:
return
expire = expire if expire is not None else self._keepalive
self.cxn.expire(self.key, expire)
def is_out_of_date(self):
if self.key is None:
return False
return self.cxn.get(self.key) != self.initial
def finalize(self):
if self.key is None:
return False
with self.cxn.pipeline() as pipe:
while True:
try:
pipe.watch(self.key)
if pipe.get(self.key) != self.initial:
return True
pipe.multi()
pipe.delete(self.key)
pipe.execute()
break
except redis.WatchError:
continue
return False
這裏有一個更填寫的解決方案基於關閉https://stackoverflow.com/a/28157498/4391298而是變成了裝飾和把手伸進海帶連接池以重用您的Redis計數器。
import logging
from functools import wraps
# Not strictly required
from django.core.exceptions import ImproperlyConfigured
from django.core.cache.utils import make_template_fragment_key
from celery.utils import gen_task_name
LOGGER = logging.getLogger(__name__)
def debounced_task(**options):
"""Debounced task decorator."""
try:
countdown = options.pop('countdown')
except KeyError:
raise ImproperlyConfigured("Debounced tasks require a countdown")
def factory(func):
"""Decorator factory."""
try:
name = options.pop('name')
except KeyError:
name = gen_task_name(app, func.__name__, func.__module__)
@wraps(func)
def inner(*args, **kwargs):
"""Decorated function."""
key = make_template_fragment_key(name, [args, kwargs])
with app.pool.acquire_channel(block=True) as (_, channel):
depth = channel.client.decr(key)
if depth <= 0:
try:
func(*args, **kwargs)
except:
# The task failed (or is going to retry), set the
# count back to where it was
channel.client.set(key, depth)
raise
else:
LOGGER.debug("%s calls pending to %s",
depth, name)
task = app._task_from_fun(inner, **options, name=name + '__debounced')
@wraps(func)
def debouncer(*args, **kwargs):
"""
Debouncer that calls the real task.
This is the task we are scheduling."""
key = make_template_fragment_key(name, [args, kwargs])
with app.pool.acquire_channel(block=True) as (_, channel):
# Mark this key to expire after the countdown, in case our
# task never runs or runs too many times, we want to clean
# up our Redis to eventually resolve the issue.
channel.client.expire(key, countdown + 10)
depth = channel.client.incr(key)
LOGGER.debug("Requesting %s in %i seconds (depth=%s)",
name, countdown, depth)
task.si(*args, **kwargs).apply_async(countdown=countdown)
return app._task_from_fun(debouncer, **options, name=name)
return factory
您可以使用緩存。許多鍵值存儲都有時間記錄,嘗試從存儲中獲取結果,如果沒有結果,則在返回之前運行任務並將結果存儲過期時間。只使用一個工人,以便順序執行任務。避免鎖定方案,除非你想處理陳舊的鎖。 – 2015-01-26 18:53:29
哦,絕對。但我寧願避免執行自我反省(檢查參數,跟蹤結果等)的煩人之處,我想知道是否有任何標準的方式來做到這一點。 – 2015-01-26 18:55:40
在Python中編寫緩存修飾器(可能是4行)很簡單,我希望我有時間發佈一個完整的答案。 – 2015-01-26 18:59:42