2011-06-16 33 views
1

我有一些關於使用Python和Redis創建作業隊列應用程序以運行異步命令的一般性問題。這裏是代碼我迄今產生:Python&Redis:經理/工作者應用程序最佳實踐

def queueCmd(cmd): 
    r_server.rpush("cmds", cmd) 

def printCmdQueue(): 
    print r_server.lrange("cmds", 0 , -1) 

def work(): 
    print "command being consumed: ", r_server.lpop("cmds") 
    return -1 

def boom(info): 
    print "pop goes the weasel" 

if __name__ == '__main__': 

    r_server = redis.Redis("localhost") 

    queueCmd("ls -la;sleep 10;ls") 
    queueCmd("mkdir test; sleep 20") 
    queueCmd("ls -la;sleep 10;ls") 
    queueCmd("mkdir test; sleep 20") 
    queueCmd("ls -la;sleep 10;ls") 
    queueCmd("mkdir test; sleep 20") 

    printCmdQueue() 

    pool = Pool(processes=2) 

    print "cnt:", +r_server.llen("cmds") 
    #while r_server.llen("cmds") > 0: 
    while True: 
     pool.apply_async(work, callback=boom) 
     if not r_server.lrange("cmds", 0, -1): 
     #if r_server.llen("cmds") == 0: 
      print "Terminate pool" 
      pool.terminate() 
      break 

    printCmdQueue() 

首先,我在相信糾正,如果我需要做任何溝通的經理,我想用一個回調做呢?我在這個使用中看到的快速示例將異步調用存儲在結果中,並通過result.get(timeout = 1)訪問它。通過溝通,我的意思是把東西放回到redis列表中。

編輯:如果該命令在異步運行,並且我對主內部結果超時,那麼超時工人還是隻是在管理器內的操作?如果只有經理不能使用它來檢查工作人員的退出代碼嗎?

接下來,該代碼產生以下輸出:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20'] 
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20'] 
pop goes the weasel 
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20'] 
command being consumed: mkdir test; sleep 20 
pop goes the weasel 
pop goes the weasel 
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20'] 
pop goes the weasel 
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20'] 
command being consumed: mkdir test; sleep 20 
Terminate pool 
command being consumed: None 
pop goes the weasel 
pop goes the weasel 
pop goes the weasel 
[] 

爲什麼工人想在即使我坡平趕走一個在時間的消耗多CMDS?在類似的情況下,這並不總是很好地結束,有時需要一個ctrl + c。爲了解決他的問題,我把隊列清理出去,然後再去。我認爲這涉及到apply_sync()以及是否退出循環。我想知道是否還需要在工人方面發生?

如果我改變註釋掉IFS的一個,我得到:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls' 

這似乎是它會是一個更好的方法來檢查,看看是否我需要打破,但似乎函數返回一個字符串文字有時?

任何意見,以改善這將不勝感激。我只是試圖讓一個經理在Linux機器上像一個服務/守護進程一樣。它將用於從redis列表中獲取作業(當前的命令但可能更多),並將結果返回到redis列表中。然後,GUI將與該經理交互以獲取隊列狀態並返回結果。

感謝,

編輯:

我意識到我是一個有點混日子的。我不需要從工作者訪問redis服務器,這導致了一些錯誤(特別是ValueError)。

爲了解決這個問題,現在的循環是:

while not r_server.llen("cmds") == 0: 
    cmd = r_server.lpop("cmds") 
    pool.apply_async(work, [cmd]) 

這些行我打電話pool.close()後。我用os.getpid()os.getppid()來檢查我是否確實有多個孩子跑來跑去。

如果這聽起來像是一種創建使用redis的管理員/工作者應用程序的好方法,我仍然會很開心。

回答

2

你的問題是,你正嘗試同時運行多個命令與一個單一的redis連接。

你期待像

Thread 1  Thread 2 
LLEN test  
1        
LPOP test 
command  
      LLEN test 
      0 

,但你得到

Thread 1  Thread 2 
LLEN test  
1        
LPOP test 
      LLEN test 
      command 
0 

結果回來以相同的順序,但並沒有什麼線程或命令鏈接到一個特定的結果。單獨的redis連接不是線程安全的 - 每個工作線程需要一個。

如果您不適當地使用流水線,也可以看到類似的問題 - 它被設計用於只寫場景,比如將大量項目添加到列表中,通過假設LPUSH成功而不是等待服務器告訴您它在每個項目之後成功。 Redis仍然會返回結果,但它們不一定是上次發送的命令的結果。

除此之外,基本的方法是合理的。有一對夫婦的增強,你可以儘管做:

  • ,而不是檢查長度,只需要使用非阻塞LPOP - 如果返回null,該列表是空
  • 添加一個定時器,使得如果列表是空的,它會等待,而不是隻發出另一個命令。
  • 包括在while循環條件的取消檢查
  • 手柄連接錯誤 - 我使用一個外循環設置,這樣,如果連接失敗,工人將嘗試重新連接(基本上重啓主要)爲合理數量嘗試完全終止工作進程。
+0

謝謝湯姆。正如我在編輯中所說的那樣,我想出了一個辦法,通過向工作人員發送參數發送隊列信息。但是,我還寫了一個替代版本,在其中創建了工作端的連接(如您所描述的)。我馬上看到的主要區別是回調觸發了多少次。如果您不介意,您是否可以闡明這些方法之間的一些差異?看起來工作人員的開銷似乎更大,因爲那裏有服務器連接。此外,我計劃實施一些日誌記錄,這是實施的一個主要關注點。 – 2011-06-17 06:03:31

+0

連接相當輕量級,所以我不用擔心開銷--Redis可以處理數千個連接,並且您的客戶端無法處理數千個線程。在工作人員創建一個連接__in__與爲工作人員創建一個__for_稍有不同。觸發的區別僅在於「命令被使用:無」的區別嗎?如果你有相同的線程同時調用LLEN和LPOP,那麼這種情況就不太可能發生。 – 2011-06-17 06:29:22

+0

我(經常)不再有'正在使用的命令:無'。在管理器中只有一個redis連接的情況下,如果'llen'!= 0,它只調用'apply_async'。在每個工作者都有自己的連接'llen'和'lpop'的情況下調用正如你所說的那樣,這名工人的可能性較小,但有時候還有一名工人「無」。我傾向於1傳遞命令,然後使用回調的Redis連接。當我在後者中添加日誌記錄時,我需要每個工作者的記錄器(類似於redis連接),這是正確的嗎? – 2011-06-17 16:48:26