2016-04-01 46 views
2

我正在用Celery替換一些自主開發的代碼,但難以複製當前的行爲。我期望的行爲如下:使用主題交換運行多個芹菜任務

  • 當創建一個新用戶,應該要有一個消息發佈到與user.created路由鍵tasks交換。
  • 這條消息應觸發兩個芹菜任務,即send_user_activate_emailcheck_spam

我試圖通過定義一個ignore_result=True爭論一個user_created任務實現這一點,再加上send_user_activate_emailcheck_spam任務。

在我的配置中,我添加了以下路由和隊列定義。郵件傳遞到user_created隊列時,郵件不會傳遞到其他兩個隊列。

理想情況下,該消息僅傳遞到send_user_activate_emailcheck_spam隊列。當使用vanilla RabbitMQ時,消息被髮布到交換機,隊列可以綁定到該交換機,但是Celery似乎直接向隊列傳遞消息。

我將如何實現上面在Celery中列出的行爲?

CELERY_QUEUES = { 
    'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'}, 
    'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'}, 
    'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'}, 
} 

CELERY_ROUTES = { 
    'user_created': { 
     'queue': 'user_created', 
     'routing_key': 'user.created', 
     'exchange': 'tasks', 
     'exchange_type': 'topic', 
    }, 
    'send_user_activate_email': { 
     'queue': 'user_created', 
     'routing_key': 'user.created', 
     'exchange': 'tasks', 
     'exchange_type': 'topic', 
    }, 
    'check_spam': { 
     'queue': 'user_created', 
     'routing_key': 'user.created', 
     'exchange': 'tasks', 
     'exchange_type': 'topic', 
    }, 
} 

回答

0

這聽起來像你期待一個消息觸發/被兩個隊列消耗,但這不是芹菜如何工作。 Exchange會將任務發佈到符合條件的隊列,但一旦它被使用,其他隊列將忽略該消息。每個需要觸發的任務都需要一條消息。

由於在這個系統中有兩種使用「隊列」的功能,所以常常與新的Celery用戶混淆。 Queue()和文檔引用的Kombu隊列,以及直接保存消息並由工作人員使用的AMQP隊列。當我們發佈到隊列中時,我們會想到AMQP的,這是不正確的。 (感謝下面的鏈接)。

回到你的問題,如果我正確理解,當user_created被消耗時,你希望它產生兩個更多的任務; send_user_activate_email和check_spam。此外,這些不應該相互依賴;他們可以在不同的機器上並行運行,不需要知道彼此的狀態。

在這種情況下,您希望user_created將這兩個新任務「apply_async」並返回。這可以直接完成,也可以使用包含check_spam和send_user_activate_email的Celery「Group」來實現。這個小組給出了一些很好的速記,併爲你的任務提供了一些結構,所以親自推動你的發展方向。

#pseudocode 
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async() 

此設置將創建四條消息;每個要執行的任務加上一個用於Group(),它本身就會有結果。

就你而言,我不確定Exchange或ignore_result是否必要,但我需要查看任務代碼並理解系統才能作出判斷。

http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys Why do CELERY_ROUTES have both a "queue" and a "routing_key"?

(如果我的路要走,我會刪除/刪除答案...)

+0

感謝您的詳細解釋。從您的答案和文檔中收集信息,Celery使用routing_key在工作人員之間分配任務,而不是讓多個任務響應單個消息。這基本上會迫使你緊密耦合觸發任務和處理任務的代碼。它是否正確? – joelcox

+0

@ joelcox,我認爲這是一個很好的總結。這條規則的例外是Map()和Starmap(),我相信它爲序列中的每個元素執行任務,但只發送一條消息。如果你想讓任務相互響應(比如等待另一個成功,因爲它需要元數據繼續),你還可以查看Chain(),Chord()。 –

0

最簡單的方式dessign和解決你的問題是usign芹菜的工作流程。
但首先我會改變你的隊列定義,爲每個任務設置一個唯一的路由鍵,並用'直接'值設置exchange_type。

celery documentation據,直接交流的確切路由鍵匹配,所以我們設置了相同的交換所有自定義任務和消費者的隊列,我們​​routing_key地圖(對於任務)和binding_key(用於隊列)之類的下一個片段:

CELERY_QUEUES = { 
    'user_created': {'binding_key':'user_created', 'exchange': 'tasks', 'exchange_type': 'direct'}, 
    'send_user_activate_email': {'binding_key':'send_user_activate_email', 'exchange': 'tasks', 'exchange_type': 'direct'}, 
    'check_spam': {'binding_key':'check_spam', 'exchange': 'tasks', 'exchange_type': 'direct'}, 
} 

CELERY_ROUTES = { 
    'user_created': { 
     'queue': 'user_created', 
     'routing_key': 'user_created', 
     'exchange': 'tasks', 
     'exchange_type': 'direct', 
    }, 
    'send_user_activate_email': { 
     'queue': 'send_user_activate_email', 
     'routing_key': 'send_user_activate_email', 
     'exchange': 'tasks', 
     'exchange_type': 'direct', 
    }, 
    'check_spam': { 
     'queue': 'check_spam', 
     'routing_key': 'check_spam', 
     'exchange': 'tasks', 
     'exchange_type': 'direct', 
    }, 
} 

完成此更改後,您需要爲可用列表(http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives)使用適當的工作流程。讀你的問題我認爲你需要一個鏈,因爲訂單需要保存。

sequential_tasks = [] 
sequential_tasks.append(user_created.s(**user_created_kwargs)) 
sequential_tasks.append(send_user_activate_email.s(**send_user_activate_email_kwargs)) 
sequential_tasks.append(check_spam.s(**check_spam_kwargs)) 
#you can add more tasks to the chain 
chain(*sequential_tasks)() 

芹菜將透明地處理與隊列有關的工作。

+0

你能解釋爲什麼我需要爲每個任務單獨交換嗎?如果有問題,send_user_activate_email和check_spam任務可以並行運行。 – joelcox

+0

看看我最後的編輯 – xecgr