2015-01-26 180 views
6

是否有一個標準的方法去除Celery任務?去抖芹菜任務?

例如,這樣一個任務可以是「開始」多次,但僅在一些延遲後運行一次:

def debounce_task(task): 
    if task_is_queued(task): 
     return 
    task.apply_async(countdown=30) 
+0

您可以使用緩存。許多鍵值存儲都有時間記錄,嘗試從存儲中獲取結果,如果沒有結果,則在返回之前運行任務並將結果存儲過期時間。只使用一個工人,以便順序執行任務。避免鎖定方案,除非你想處理陳舊的鎖。 – 2015-01-26 18:53:29

+0

哦,絕對。但我寧願避免執行自我反省(檢查參數,跟蹤結果等)的煩人之處,我想知道是否有任何標準的方式來做到這一點。 – 2015-01-26 18:55:40

+0

在Python中編寫緩存修飾器(可能是4行)很簡單,我希望我有時間發佈一個完整的答案。 – 2015-01-26 18:59:42

回答

4

下面是我們如何使用Redis的櫃檯做。所有這些可能都可以在裝飾器中推廣,但我們只將它用於特定任務(webhooks)

您的公共任務是您從其他功能調用的任務。它需要在Redis中增加一個密鑰。關鍵是你的函數的參數,不管他們可能是形成(這確保了計數器是衆多個人的任務是唯一的)

@task 
def your_public_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    get_redis().incr(cache_key) 
    _your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY) 

注緩存鍵功能共享(你想在每一個功能相同的緩存鍵)和countdown設置。

然後,執行代碼的實際任務執行以下操作:

@task 
def _your_task(*args, **kwargs): 
    cache_key = make_public_task_cache_key(*args, **kwargs) 
    counter = get_redis().getset(cache_key, 0) 
    # redis makes the zero a string. 
    if counter == '0': 
     return 

    ... execute your actual task code. 

這可以讓你打your_public_task.delay(..)多次,只要你想,你QUEUE_DELAY內,它會只火了一次。

1

下面是你如何使用Mongo來做到這一點。

注:我必須讓設計更寬容一點,因爲芹菜任務不能保證執行etacountdown用完的確切時刻。

此外,Mongo過期索引只是每分鐘左右清理一次;因此,在eta啓動的時刻,您無法圍繞被刪除的記錄進行設計。

總之,流動是這樣的:

  1. 客戶端代碼調用my_task
  2. preflight遞增呼叫計數器,並且如flight_id
  3. _my_task設置TTL秒之後執行返回。
  4. _my_task運行時,它會檢查它是否仍然是最新的flight_id。如果不是,它會中止。
  5. ...晚些時候...蒙戈清理收集陳舊的條目,via an expiring index.

@celery.task(track_started=False, ignore_result=True) 
def my_task(my_arg): 
    flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL) 
    _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL) 

@celery.task(track_started=False, ignore_result=True) 
def _my_task(my_arg, flight_id=None): 
    if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id): 
     return 
    # ... actual work ... # 

庫代碼:

TTL = 5 * 60  # Run tasks after 5 minutes 
EXPIRY = 6 * TTL # This needs to be much larger than TTL. 

# We need to store a list of task-executions currently pending 
inflight_collection = db['celery_In_Flight'] 
inflight_collection.create_index([('fn', pymongo.ASCENDING,), 
            ('key', pymongo.ASCENDING,)]) 
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY) 


def preflight(collection, fn, key, ttl): 
    eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl) 
    result = collection.find_one_and_update({ 
     'fn': fn, 
     'key': key, 
    }, { 
     '$set': { 
      'eta': eta 
     }, 
     '$inc': { 
      'flightId': 1 
     } 
    }, upsert=True, return_document=pymongo.ReturnDocument.AFTER) 
    print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId']) 
    return result['flightId'] 


def check_for_takeoff(collection, fn, key, flight_id): 
    result = collection.find_one({ 
     'fn': fn, 
     'key': key 
    }) 
    ready = result is None or result['flightId'] == flight_id 
    print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready) 
    return ready 
0

鮑爾泰克有想法,使用redis計數器是原子的(如果你的代理是redis,應該很容易獲得)。雖然他的解決方案是打擊,而不是反彈。雖然差異很小(getset vs decr)。

排隊的任務:

conn = get_redis() 
conn.incr(key) 
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

然後在任務:

conn = get_redis() 
counter = conn.decr(key) 
if counter > 0: 
    # task is still queued 
    return 
# continue on to rest of task 

很難使它成爲一個裝飾,因爲你需要裝飾的任務和調用任務本身。所以你需要一個裝飾器在芹菜@task裝飾器之前和之後。

現在我只是提出了一些幫助我調用任務的函數,以及一個在任務開始時進行檢查的函數。

+0

http://stackoverflow.com/a/43625455/4391298是我終於得到的解決方案,包括一些關鍵失效處理它融化(不是原來的解決方案中的問題)。 – 2017-04-26 05:06:42

0

這是我想出了一個解決方案:https://gist.github.com/wolever/3cf2305613052f3810a271e09d42e35c

在這裏複製,爲後人:

import time 

import redis 


def get_redis_connection(): 
    return redis.connect() 

class TaskDebouncer(object): 
    """ A simple Celery task debouncer. 

     Usage:: 

      def debounce_process_corpus(corpus): 
       # Only one task with ``key`` will be allowed to execute at a 
       # time. For example, if the task was resizing an image, the key 
       # might be the image's URL. 
       key = "process_corpus:%s" %(corpus.id,) 
       TaskDebouncer.delay(
        key, my_taks, args=[corpus.id], countdown=0, 
       ) 

      @task(bind=True) 
      def process_corpus(self, corpus_id, debounce_key=None): 
       debounce = TaskDebouncer(debounce_key, keepalive=30) 

       corpus = Corpus.load(corpus_id) 

       try: 
        for item in corpus: 
         item.process() 

         # If ``debounce.keepalive()`` isn't called every 
         # ``keepalive`` interval (the ``keepalive=30`` in the 
         # call to ``TaskDebouncer(...)``) the task will be 
         # considered dead and another one will be allowed to 
         # start. 
         debounce.keepalive() 

       finally: 
        # ``finalize()`` will mark the task as complete and allow 
        # subsequent tasks to execute. If it returns true, there 
        # was another attempt to start a task with the same key 
        # while this task was running. Depending on your business 
        # logic, this might indicate that the task should be 
        # retried. 
        needs_retry = debounce.finalize() 

       if needs_retry: 
        raise self.retry(max_retries=None) 

    """ 

    def __init__(self, key, keepalive=60): 
     if key: 
      self.key = key.partition("!")[0] 
      self.run_key = key 
     else: 
      self.key = None 
      self.run_key = None 
     self._keepalive = keepalive 
     self.cxn = get_redis_connection() 
     self.init() 
     self.keepalive() 

    @classmethod 
    def delay(cls, key, task, args=None, kwargs=None, countdown=30): 
     cxn = get_redis_connection() 
     now = int(time.time()) 
     first = cxn.set(key, now, nx=True, ex=countdown + 10) 
     if not first: 
      now = cxn.get(key) 

     run_key = "%s!%s" %(key, now) 
     if first: 
      kwargs = dict(kwargs or {}) 
      kwargs["debounce_key"] = run_key 
      task.apply_async(args=args, kwargs=kwargs, countdown=countdown) 

     return (first, run_key) 

    def init(self): 
     self.initial = self.key and self.cxn.get(self.key) 

    def keepalive(self, expire=None): 
     if self.key is None: 
      return 
     expire = expire if expire is not None else self._keepalive 
     self.cxn.expire(self.key, expire) 

    def is_out_of_date(self): 
     if self.key is None: 
      return False 
     return self.cxn.get(self.key) != self.initial 

    def finalize(self): 
     if self.key is None: 
      return False 
     with self.cxn.pipeline() as pipe: 
      while True: 
       try: 
        pipe.watch(self.key) 
        if pipe.get(self.key) != self.initial: 
         return True 
        pipe.multi() 
        pipe.delete(self.key) 
        pipe.execute() 
        break 
       except redis.WatchError: 
        continue 
     return False 
0

這裏有一個更填寫的解決方案基於關閉https://stackoverflow.com/a/28157498/4391298而是變成了裝飾和把手伸進海帶連接池以重用您的Redis計數器。

import logging 
from functools import wraps 

# Not strictly required 
from django.core.exceptions import ImproperlyConfigured 
from django.core.cache.utils import make_template_fragment_key 

from celery.utils import gen_task_name 


LOGGER = logging.getLogger(__name__) 


def debounced_task(**options): 
    """Debounced task decorator.""" 

    try: 
     countdown = options.pop('countdown') 
    except KeyError: 
     raise ImproperlyConfigured("Debounced tasks require a countdown") 

    def factory(func): 
     """Decorator factory.""" 
     try: 
      name = options.pop('name') 
     except KeyError: 
      name = gen_task_name(app, func.__name__, func.__module__) 

     @wraps(func) 
     def inner(*args, **kwargs): 
      """Decorated function.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       depth = channel.client.decr(key) 

       if depth <= 0: 
        try: 
         func(*args, **kwargs) 
        except: 
         # The task failed (or is going to retry), set the 
         # count back to where it was 
         channel.client.set(key, depth) 
         raise 
       else: 
        LOGGER.debug("%s calls pending to %s", 
           depth, name) 

     task = app._task_from_fun(inner, **options, name=name + '__debounced') 

     @wraps(func) 
     def debouncer(*args, **kwargs): 
      """ 
      Debouncer that calls the real task. 
      This is the task we are scheduling.""" 

      key = make_template_fragment_key(name, [args, kwargs]) 
      with app.pool.acquire_channel(block=True) as (_, channel): 
       # Mark this key to expire after the countdown, in case our 
       # task never runs or runs too many times, we want to clean 
       # up our Redis to eventually resolve the issue. 
       channel.client.expire(key, countdown + 10) 
       depth = channel.client.incr(key) 

      LOGGER.debug("Requesting %s in %i seconds (depth=%s)", 
         name, countdown, depth) 
      task.si(*args, **kwargs).apply_async(countdown=countdown) 

     return app._task_from_fun(debouncer, **options, name=name) 

    return factory