2013-01-24 88 views
1

在我的Django網絡應用程序中,我有一個工作程序,它是速率受限的API的客戶端,負責處理來自服務器線程的對該API的所有請求。我使用我的數據庫來存儲任務隊列。任務可以大批量進行,或根本不會。我正在使用事件循環來輪詢隊列並管理任務之間的延遲,以防萬一超過速率限制(限制是動態的)。這一切都可以正常工作,但是我唯一想做的其他事情是讓工作人員在隊列變干時停止點擊數據庫,並且讓我的Django應用程序向工作人員發出隊列不再存在的信號再次乾燥。Django應用程序的工作進程的異步事件

示意性地,在僞Python的它看起來像這樣:

state = NORMAL 
delay_time = NORMAL_DELAY 

while True: 
    sleep(delay_time) 

    if state == DORMANT: 
     continue 

    task = get_next_task() # hits database 
    if task is None: 
     state = DORMANT 
     delay_time = NORMAL_TIME 

    try: 
     execute(task) 
    except RateExceeded: 
     delay_time = backoff(delay_time) 
    else: 
     delay_time = NORMAL_DELAY 

# Triggered by web layer 
def asynchronous_event(): 
    state = NORMAL 

我要麼需要一個異步事件從可設置狀態恢復到正常(這將在sleep執行)網絡層或一些觸發其他輕量級檢查,不會增加不必要的循環DB查詢。

在單機設置中,我只能使用信號,但顯然這在多機設置中不起作用。我試圖不必爲了這個信號的目的而運行單獨的消息隊列服務器。我使用Dotcloud進行託管,以便可以使用基於網絡的解決方案。理想情況下,與信號處理程序一樣易於實現。我研究過ZeroRPC,但是我不確定如何將它合併到我的事件循環中。

任何想法?

編輯

我期待到ZeroMQ來解決這個問題,但我可以使用一些幫助。棘手的部分是將有多個併發的webserver實例,並且在重新部署時,我需要從一個工作者到其後繼者的平穩過渡。因此,請耐心等待,因爲我的術語可能不正確,在我看來,最好的做法是讓每個工作人員異步地將其作爲郵箱進行綁定,並在主循環中將其從休眠模式中喚醒。每名工人在其IP的數據庫中創建一個記錄,並用創建日期加蓋時間戳。提交請求時,Web服務器向所有工作人員發佈消息。當工作人員收到一條消息時,它會檢查它是否具有最新創建日期的工作人員:如果是,則會處理該消息,如果沒有,則會自行終止。

這看起來很麻煩,但我想要正確,因爲我可能會在我的應用程序的其他地方使用這種模式。

+0

如果您已經在使用Redis,則可以將其用作輕量級隊列。這會將它從數據庫中一起移除,而且速度非常快。 –

+0

聽起來不錯,但我還沒有使用Redis。如果這是唯一不錯的選擇,我可能只需要把它弄糟。 – acjay

+0

如果您正在查看ZeroMQ並且您正在使用Python,則應該查看ZeroRPC。它有助於抽象出很多ZeroMQ的複雜性。 –

回答

0

事實證明,我決定只是每次循環都敲擊數據庫。但是我也決定,如果我想要努力實現這個目標,ZeroMQ就是一條非常有效的途徑。以下是它的工作原理:

每個工作人員綁定一個ZeroMQ用戶套接字,並將其自己註冊到工作人員的數據庫中,其中包含套接字的IP地址和端口。 Web線程將DO_TASK消息發佈給最近註冊的工作人員,並將QUIT消息發佈給可能正在工作的任何其他人。

我在部署Dotcloud,他們的支持說使用custom service環境變量和構建選項可以讓我打開必要的端口並獲取工作任務實例的IP。

0

如果數據庫中沒有任務,如何將指數退避應用到工作作業的delay_time?這可能會使您足夠減少數據庫負載,而無需將消息從Web應用程序傳輸到工作程序的額外複雜性。例如:

delay_time = NORMAL_DELAY 

while True: 
    sleep(delay_time) 
    task = get_next_task() # hits database 

    if task: 
     try: 
      execute(task) 
     except RateExceeded: 
      pass 
     else: 
      delay_time = NORMAL_DELAY 
      continue 

    delay_time = backoff(delay_time) 
+0

問題在於當任務進入時我需要快速響應 – acjay

相關問題