我正在學習AMQP,所以這可能是我在做什麼的誤解。我正嘗試在我自己的私人服務器上設置rabbitmq,以便爲系統服務器提供服務(我有一堆需要處理的圖像)。我設置了三步管道:AMQP/RabbitMQ多個消費者,但只有一個geting工作?
| put image on queue | --work_queue--> | process | --results--> | archive results |
我安裝了rabbitmq,並在服務器上創建了兩個隊列。 'work_queue'和'結果'。我使用amqplib編寫了一些python腳本,並且使用一個圖像處理器工作程序,我的管道工作得很好。我已將100張圖片添加到隊列中,並且我的一臺機器正在愉快地抓取一張圖片,並將數據生成並將結果放入結果隊列中。
問題是,我假設如果我開始另一臺機器上的另一個圖像處理工作,它會簡單地將工作從隊列中拉出。這似乎是rabbitmq站點上「工作隊列」教程中列出的確切情況。我預計這只是工作。然而,真正發生的事情是,無論我開始有多少其他工作人員,他們都會永遠等待,並且永遠不會有任何工作,即使work_queue中有很多消息在等待。
我誤解了什麼?相關代碼隊列服務器上的工作:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
....
msg = { 'filename': os.path.basename(i) }
chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
routing_key='work_queue')
而且在過程中工人消費端:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
def work_callback(msg):
....
while True:
chan.basic_consume(callback=work_callback, queue='work_queue')
try:
chan.wait()
except KeyboardInterrupt:
print "\nExiting cleanly"
sys.exit(0)
我可以看到其他工人正在連接:
$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue 246
...done.
$ sudo rabbitmqctl list_connections
Listing connections ...
pipeline 192.168.8.1 41553 running
pipeline XX.YY.ZZ.WW 46676 running
pipeline 192.168.8.4 44482 running
pipeline 192.168.8.6 41884 running
...done.
哪裏XX.YY.ZZ.WW是一個外部IP。 192.168.8.6的工作人員在隊列中攪動,但外部IP上的工作人員在那裏閒置,而有246個消息在隊列中等待其應該等待的時間。
想法?
迷人的。如果兩個「工人」在我提交工作,然後兩個工作。或者甚至更加狡猾,如果只有一個人在提交工作時被連接,就好像所有的工作都安排在那個工人那裏......如果另一個工人連接,它只能工作IFF我殺死原來的工人......然後新的一個人獲得大約一半的工作?如果我重新連接原來的一個,它會得到剩下的一半?離奇! – clemej
我希望將工作放在一個隊列中,並且每個工作人員連接到隊列並一次抓取第一個條目。它看起來相反,它會得到所有的工作。也許我不明白隊列和渠道? – clemej