2012-07-02 92 views
1

我在芹菜(與RabbitMQ)中有一個記錄器,並且想要複製緊急情況下的工作。如何複製Django芹菜工人?

# tasks.py 
@task 
def log(message): 
    with open('test.txt', 'a') as f: 
     f.write(message) 


# views.py 
log.delay(message) 

如何在不同的機器上運行log()調用Celery的2個實例?

這樣做有意義嗎?

這是RabbitMQ中可能的。如果你有一個topic-based exchange,很明顯,一條消息可以放入兩個不同的隊列,並獨立傳送給2個接收器。

sender => 
[message, routing_key=event.logging.log] => [queue A, topic=event.#]  
                 => receiver 1 
             => [queue B, topic=*.logging.*] 
                 => receiver 2 

消息將被髮送到兩個隊列,並且他們都不會從另一個隊列竊取消息。

回答

1

要做到這一點,你就必須配置Exchange是一個話題交換(如你所說):

from celery import current_app as celery 

with celery.broker_connection() as conn: 
    conn.default_channel.queue_declare(queue='celery.backup', durable=True) 
    conn.default_channel.queue_bind(queue='celery.backup', 
            exchange='celerytopic', 
            routing_key='celery', 
            durable=True) 

CELERY_QUEUES = { 
    'celery': { 
     'exchange': 'celerytopic', 
     'exchange_type': 'topic', 
     'routing_key': 'celery', 
    }, 
} 

然後你可以使用AMQP API創建備份交換機

由於您已經有一個名爲芹菜的隊列,您可能必須先刪除它:

$ camqadm queue.delete celery 
1

對我來說,嘗試在兩臺不同的機器上啓動這個任務是沒有意義的。至少Celery不能保證任務將在不同的機器上運行 - 它是分配負載的RabbitMQ,並且如果一個節點的負載比其他機器少 - 運行的兩個任務將可能在該機器上執行...

改爲使用task.retry。如果Celery不能執行,它將重試一項任務。芹菜很聰明,可以理解任務是否失敗。只要確保在任務失敗時引發一些異常,並且如果無法成功記錄,則不要默默返回。

UPDATE:

一個可能的工作流程可能是 - 嘗試執行任務,如果失敗,在on_retry改變routing_key,並嘗試在不同的交換/隊列執行任務,可以是您的故障轉移隊列。

+0

我會在RabbitMQ上添加註釋。 –

+1

我的觀點是,如果在同等配置的節點之間進行簡單的負載平衡,則無法選擇哪個節點將執行任務。至少這是通常的故障轉移行爲。如果您將任務限制在特定節點上運行 - 那麼您將自動失去故障切換,並且綁定到選定節點。如果他們都死了..你仍然有運行節點,但不會使用它們。或者簡單地說 - 如果它會失敗 - 你將很難達到啓動2個同步任務的靈活性。你會採用更安全的方法 - 即使用on_retry()函數來處理重試。 – Tisho

+0

對,這很有道理。 –