0
我一直在評估ZooKeeper作爲一個簡單的消息隊列,我寫了兩個非常簡單的腳本:mq feeder和mq consumer。饋線,下面,是精心推出20個職位的隊列,然後監視隊列狀態(工種被消耗):ZooKeeper和基於Python的消息隊列中的競爭條件
from kazoo.client import KazooClient
zk = KazooClient(hosts='xxx')
zk.start()
for i in xrange(20):
zk.create("/queue/%s" % i, b"%s" % i)
while 1:
print zk.get_children('/queue')
消費者,下面,正在啓動幾次(最多3個併發進程中我的測試),並取任務列表,在它迭代找開鎖的工作,處理它(休眠隨機數秒,以模擬一些工作),一旦完成,將刪除該作業,然後刪除該鎖:
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
from time import sleep
import random
zk = KazooClient(hosts='xxx')
zk.start()
zk.ensure_path("/locks")
zk.ensure_path("/queue")
while 1:
jobs = sorted(zk.get_children('/queue'))
if jobs:
for i in jobs:
print "Checking job: %s" % i
try:
zk.create("/locks/%s" % i)
except NodeExistsError:
print "Job is locked, skipping!"
pass
else:
print "Job is unlocked, processing."
sleep(random.randrange(5))
zk.delete("/queue/%s" % i)
print "Deleted processed job, deleting the lock."
zk.delete("/locks/%s" % i)
pass
else:
print "There's no locks in the queue."
pass
我看到了,我是無法追蹤的問題是,消費者的過程與退出:
Traceback (most recent call last):
File "zk_consumer.py", line 24, in <module>
zk.delete("/queue/%s" % i)
File "/Library/Python/2.7/site-packages/kazoo/client.py", line 1055, in delete
return self.delete_async(path, version).get()
File "/Library/Python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in get
raise self._exception
kazoo.exceptions.NoNodeError: ((), {})
而最後的進程仍然永遠檢查單的工作,留在隊列中,但始終處於鎖定狀態。很明顯,我在這裏遇到了一些邏輯錯誤,我認爲這會導致競爭狀態,但我花了一些時間在上面,而我似乎無法發現它。我在這裏做錯了什麼,或者ZooKeeper不是簡單工作隊列的可行解決方案?
你是正確的,但我也發現了有一個更正確的以實現我所需要的,也就是說,使用Kazoo的LockingQueue配方,如https://kazoo.readthedocs.org/en/latest/api/recipe/queue.html#kazoo.recipe.queue.LockingQueue – SpankMe 2013-05-04 20:38:27