我正在用Celery替換一些自主開發的代碼,但難以複製當前的行爲。我期望的行爲如下:使用主題交換運行多個芹菜任務
- 當創建一個新用戶,應該要有一個消息發佈到與
user.created
路由鍵tasks
交換。 - 這條消息應觸發兩個芹菜任務,即
send_user_activate_email
和check_spam
。
我試圖通過定義一個ignore_result=True
爭論一個user_created
任務實現這一點,再加上send_user_activate_email
和check_spam
任務。
在我的配置中,我添加了以下路由和隊列定義。郵件傳遞到user_created
隊列時,郵件不會傳遞到其他兩個隊列。
理想情況下,該消息僅傳遞到send_user_activate_email
和check_spam
隊列。當使用vanilla RabbitMQ時,消息被髮布到交換機,隊列可以綁定到該交換機,但是Celery似乎直接向隊列傳遞消息。
我將如何實現上面在Celery中列出的行爲?
CELERY_QUEUES = {
'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'send_user_activate_email': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'check_spam': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
}
感謝您的詳細解釋。從您的答案和文檔中收集信息,Celery使用routing_key在工作人員之間分配任務,而不是讓多個任務響應單個消息。這基本上會迫使你緊密耦合觸發任務和處理任務的代碼。它是否正確? – joelcox
@ joelcox,我認爲這是一個很好的總結。這條規則的例外是Map()和Starmap(),我相信它爲序列中的每個元素執行任務,但只發送一條消息。如果你想讓任務相互響應(比如等待另一個成功,因爲它需要元數據繼續),你還可以查看Chain(),Chord()。 –