2015-09-02 99 views
4
from celery import Celery 

app = Celery('tasks', backend='amqp://[email protected]//', broker='amqp://[email protected]//') 

a_num = 0 

@app.task 
def addone(): 
    global a_num 
    a_num = a_num + 1 
    return a_num 

這是我用來測試芹菜的代碼。 我希望每次使用addone()時都會增加返回值。 但它總是1 爲什麼?芹菜不適用於全局變量

結果

python 
>> from tasks import addone 
>> r = addone.delay() 
>> r.get() 
    1 
>> r = addone.delay() 
>> r.get() 
    1 
>> r = addone.delay() 
>> r.get() 
    1 

回答

12

默認情況下,當一個工作器啓動時,Celery以併發4啓動它,這意味着它有4個進程開始處理任務請求。 (加上控制其他進程的進程)。我不知道使用什麼算法將任務請求分配給爲工作人員啓動的進程,但最終,如果執行addone.delay().get()就夠了,您會看到數字大於1 。會發生什麼情況是每個進程(並非每個任務)都獲得其自己的副本a_num。當我在這裏嘗試時,我的第五次執行addone.delay().get()返回2

您可以通過啓動您的工作人員使用單個進程處理請求來強制每次增加數量。 (例如,celery -A tasks worker -c1)但是,如果您曾經重啓過您的工作人員,則編號將重置爲0.此外,我不會設計僅在處理請求的進程數量爲1時才起作用的代碼。同事決定多個進程應該處理這些任務的請求,然後事情就會中斷。 (代碼中註釋中的大胖警告只能做很多事情。)

在一天結束時,這種狀態應該在緩存中共享,如Redis或用作緩存的數據庫,爲你的問題中的代碼。

然而,在評論你寫道:

讓我們看看我想用一個任務來送東西。我不想每次都連接任務,我想分享一個全球連接。

將連接存儲在緩存中將不起作用。我強烈建議讓Celery開始的每個流程都使用自己的連接,而不是嘗試在流程之間共享。 每個新的任務請求都不需要重新打開連接。每個進程打開一次,然後該進程提供的每個任務請求重用連接。

在很多情況下,嘗試在進程之間共享相同的連接(例如通過共享虛擬內存(例如,通過fork))將無法繼續工作。連接通常帶有狀態(例如,數據庫連接是否處於自動提交模式)。如果代碼的兩部分期望連接處於不同的狀態,則代碼將不一致地運行。

-2

的任務將每次運行異步所以它會啓動新任務a_num將被設置爲0。它們運行作爲單獨的實例。

如果你想使用值,我建議一個價值商店或某種數據庫。

+0

我該如何分享價值?讓我們看看我想用任務發送一些東西。我不想每次都連接任務,我想分享一個全球連接。所以任務可以使用相同的連接。 – xren

+0

我的猜測是你不能那樣做,但可能會有一些我不知道的魔法。 – olofom

+0

它與異步有什麼關係? – spacediver