2017-05-11 202 views
0

我有,我安裝了RabbitMQ的經紀人和兩個芹菜消費者(main1.pymain2.py)都連接到同一代理服務器。芹菜任務

在第一消費者(main1.py),我實現發送多次在一個特定的隊列不同任務中的芹菜節拍:

app = Celery('tasks', broker=..., backend=...) 
app.conf.task_routes = (
    [ 
     ('tasks.beat', {'queue': 'print-queue'}), 
    ], 
) 
app.conf.beat_schedule = { 
    'beat-every-10-seconds': { 
     'task': 'tasks.beat', 
     'schedule': 10.0 
    }, 
} 

@app.task(name='tasks.beat', bind=True) 
def beat(self): 
    for i in range(10): 
     app.send_task("tasks.print", args=[i], queue="print-queue") 

    return None 

在第二消費者(main2.py ),我實現的任務上面說:

app = Celery('tasks', broker=..., backend=...) 
app.conf.task_routes = (
    [ 
     ('tasks.print', {'queue': 'print-queue'}), 
    ], 
) 

@app.task(name='tasks.print', bind=True) 
def print(self, name): 
    return name 

當我開始兩個芹菜工人:

consumer1: celery worker -A main1 -Q print-queue --beat 
consumer2: celery worker -A main2 -Q print-queue 

我得到這些錯誤:在第二消費

[ERROR/MainProcess] Received unregistered task of type 'tasks.print' 

第一消費

[ERROR/MainProcess] Received unregistered task of type 'tasks.beat' 

是否可以工作在兩個連接到不同的芹菜應用拆分同一個經紀人?

在此先感謝!

回答

1

這裏發生了什麼。你有兩個工人AB其中之一也恰好運行芹菜拍(比方說,一個是B)。

  1. 芹菜拍提交task.beat到隊列。所有這些工作都是通過一些元數據(包括任務的名稱)在兔子中輸入消息。
  2. 其中一名員工閱讀該消息。 A和B都在監聽同一隊列,因此可以讀取它。

    a。如果A讀取消息,它將嘗試找到名爲tasks.beat的任務,因爲A不定義該任務。

    b。如果B讀取該郵件是否將成功嘗試找到任務調用tasks.beat(因爲它確實有任務),並運行代碼。 tasks.beat將會在包含tasks.print的元數據的兔子中入列新消息。將再次發生

  3. 同樣的問題,因爲只有A和B之一限定tasks.print但任何可以得到的消息。

在實踐中,芹菜可能會做一些檢查以提前發出錯誤消息,但我相當肯定這是潛在的問題。

總之,所有的工人隊列(包括拍)應該運行相同的代碼。

+0

感謝@mbattifarano,我完全明白你的意思。我使用兩個不同的隊列解決了這個問題,一個用於** tasks.beat **,另一個用於** tasks.print **。 – alauri