2013-06-25 45 views
2

我有不同的兔子隊列,每個專用於一種特殊的訂單處理:是否可以使用芹菜的畫布基元的自定義路線?

# tasks.py 

@celery.task 
def process_order_for_product_x(order_id): 
    pass # elided ... 


@celery.task 
def process_order_for_product_y(order_id): 
    pass # elided ... 


# settings.py 

CELERY_QUEUES = { 
    "black_hole": { 
     "binding_key": "black_hole", 
     "queue_arguments": {"x-ha-policy": "all"} 
    }, 
    "product_x": { 
     "binding_key": "product_x", 
     "queue_arguments": {"x-ha-policy": "all"} 
    }, 
    "product_y": { 
     "binding_key": "product_y", 
     "queue_arguments": {"x-ha-policy": "all"} 
    }, 

我們必須通過設置CELERY_DEFAULT_QUEUE = 'black_hole'然後從不black_hole消耗執行顯式路由的政策。

每項任務可以使用芹菜的畫布元,像這樣:

# tasks.py 

@celery.task 
def process_order_for_product_x(order_id): 
    # These can run in parallel 
    stage_1_group = group(do_something.si(order_id), 
          do_something_else.si(order_id)) 

    # These can run in parallel 
    another_group = group(do_something_at_end.si(order_id), 
          do_something_else_at_end.si(order_id)) 

    # These run in a linear sequence 
    process_task = chain(
     stage_1_group, 
     do_something_dependent_on_stage_1.si(order_id), 
     another_group) 

    process_task.apply_async() 

假如我想要的celery.groupcelery.chordcelery.chord_unlock,等帆布任務的特定用途通過隊列流入其相應的產品,而不是被困在black_hole中,有沒有辦法用自定義任務名稱或自定義routing_key來調用每個特定的畫布任務?

由於我不會進入的原因,我寧願不發送所有celery.*任務到一個全部爲celery_canvas的隊列,這正是我在此期間所做的。

回答

4

這種方法可以讓你的路線芹菜帆布任務回調任務的隊列。

here所述,可以爲Celery指定自定義的基於類的任務路由器。

讓我們把注意力集中在celery.chord_unlock任務上。其簽名定義爲here

def unlock_chord(self, group_id, callback, ...): 

第二個位置參數是和絃回調任務的簽名。

Celery中的任務簽名基本上是dicts,所以我們有機會訪問任務選項,包括任務隊列名稱。

下面是一個例子:

class CeleryRouter(object): 
    def route_for_task(self, task, args=None, kwargs=None): 
     if task == 'celery.chord_unlock': 
      callback_signature = args[1] 
      options = callback_signature.get('options') 
      if options: 
       queue = options.get('queue') 
       if queue: 
        return {'queue': queue} 

它加入芹菜配置:

CELERY_ROUTES = (CeleryRouter(), 
0

我目前在我的項目中使用芹菜。對於某些情況下,我需要的任務鏈,雖然不同的隊列:宣佈像這樣

chain(get_staff.s(url), save_staff.s(dt, partner_id, url))() 

這兩個功能:

@task(queue='celery_gevent') 
def get_staff(source_url): 

@task # send to default queue 
def save_staff(suggests, dt, partner, url): 

順便說一句,celery_gevent由工人用GEVENT池發出HTTP處理要求。

這個例子,你如何指定隊列含蓄。您也可以通過指定其他參數,可以像這樣在不同的隊列明確放任務:

In [1]: add.apply_async([4,5]) 
Out[1]: <AsyncResult: bda3dedd-c2c4-44db-be8e-6a97e718f8b0> 

$ sudo rabbitmqctl list_queues 
Listing queues ... 
celery 1 
...done. 

In [2]: add.apply_async([4,5], queue='your_product') 
Out[2]: <AsyncResult: 934f6161-298b-468b-9716-3da6fae58fa5> 

$ sudo rabbitmqctl list_queues 
Listing queues ... 
celery 1 
your_product 1 
...done. 

您可以在自定義的隊列中運行整個畫布:

process_task.apply_async(queue='your_queue') 

嘗試指定內部@task裝飾queue_name 。這應該有所幫助。

鏈接:

http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html#Task.apply_async

相關問題