我有一些關於使用Python和Redis創建作業隊列應用程序以運行異步命令的一般性問題。這裏是代碼我迄今產生:Python&Redis:經理/工作者應用程序最佳實踐
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
首先,我在相信糾正,如果我需要做任何溝通的經理,我想用一個回調做呢?我在這個使用中看到的快速示例將異步調用存儲在結果中,並通過result.get(timeout = 1)訪問它。通過溝通,我的意思是把東西放回到redis列表中。
編輯:如果該命令在異步運行,並且我對主內部結果超時,那麼超時工人還是隻是在管理器內的操作?如果只有經理不能使用它來檢查工作人員的退出代碼嗎?
接下來,該代碼產生以下輸出:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
爲什麼工人想在即使我坡平趕走一個在時間的消耗多CMDS?在類似的情況下,這並不總是很好地結束,有時需要一個ctrl + c。爲了解決他的問題,我把隊列清理出去,然後再去。我認爲這涉及到apply_sync()以及是否退出循環。我想知道是否還需要在工人方面發生?
如果我改變註釋掉IFS的一個,我得到:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
這似乎是它會是一個更好的方法來檢查,看看是否我需要打破,但似乎函數返回一個字符串文字有時?
任何意見,以改善這將不勝感激。我只是試圖讓一個經理在Linux機器上像一個服務/守護進程一樣。它將用於從redis列表中獲取作業(當前的命令但可能更多),並將結果返回到redis列表中。然後,GUI將與該經理交互以獲取隊列狀態並返回結果。
感謝,
編輯:
我意識到我是一個有點混日子的。我不需要從工作者訪問redis服務器,這導致了一些錯誤(特別是ValueError)。
爲了解決這個問題,現在的循環是:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
這些行我打電話pool.close()
後。我用os.getpid()
和os.getppid()
來檢查我是否確實有多個孩子跑來跑去。
如果這聽起來像是一種創建使用redis的管理員/工作者應用程序的好方法,我仍然會很開心。
謝謝湯姆。正如我在編輯中所說的那樣,我想出了一個辦法,通過向工作人員發送參數發送隊列信息。但是,我還寫了一個替代版本,在其中創建了工作端的連接(如您所描述的)。我馬上看到的主要區別是回調觸發了多少次。如果您不介意,您是否可以闡明這些方法之間的一些差異?看起來工作人員的開銷似乎更大,因爲那裏有服務器連接。此外,我計劃實施一些日誌記錄,這是實施的一個主要關注點。 – 2011-06-17 06:03:31
連接相當輕量級,所以我不用擔心開銷--Redis可以處理數千個連接,並且您的客戶端無法處理數千個線程。在工作人員創建一個連接__in__與爲工作人員創建一個__for_稍有不同。觸發的區別僅在於「命令被使用:無」的區別嗎?如果你有相同的線程同時調用LLEN和LPOP,那麼這種情況就不太可能發生。 – 2011-06-17 06:29:22
我(經常)不再有'正在使用的命令:無'。在管理器中只有一個redis連接的情況下,如果'llen'!= 0,它只調用'apply_async'。在每個工作者都有自己的連接'llen'和'lpop'的情況下調用正如你所說的那樣,這名工人的可能性較小,但有時候還有一名工人「無」。我傾向於1傳遞命令,然後使用回調的Redis連接。當我在後者中添加日誌記錄時,我需要每個工作者的記錄器(類似於redis連接),這是正確的嗎? – 2011-06-17 16:48:26