2013-08-30 163 views
1

我正在學習AMQP,所以這可能是我在做什麼的誤解。我正嘗試在我自己的私人服務器上設置rabbitmq,以便爲系統服務器提供服務(我有一堆需要處理的圖像)。我設置了三步管道:AMQP/RabbitMQ多個消費者,但只有一個geting工作?

| put image on queue | --work_queue--> | process | --results--> | archive results | 

我安裝了rabbitmq,並在服務器上創建了兩個隊列。 'work_queue'和'結果'。我使用amqplib編寫了一些python腳本,並且使用一個圖像處理器工作程序,我的管道工作得很好。我已將100張圖片添加到隊列中,並且我的一臺機器正在愉快地抓取一張圖片,並將數據生成並將結果放入結果隊列中。

問題是,我假設如果我開始另一臺機器上的另一個圖像處理工作,它會簡單地將工作從隊列中拉出。這似乎是rabbitmq站點上「工作隊列」教程中列出的確切情況。我預計這只是工作。然而,真正發生的事情是,無論我開始有多少其他工作人員,他們都會永遠等待,並且永遠不會有任何工作,即使work_queue中有很多消息在等待。

我誤解了什麼?相關代碼隊列服務器上的工作:

from amqplib import client_0_8 as amqp 
conn = amqp.Connection(host="foo:5672 ", userid="pipeline", 
    password="XXXXX", virtual_host="/", insist=False) 
chan = conn.channel() 

.... 

msg = { 'filename': os.path.basename(i) } 

chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='', 
     routing_key='work_queue') 

而且在過程中工人消費端:

from amqplib import client_0_8 as amqp 
conn = amqp.Connection(host="foo:5672 ", userid="pipeline", 
    password="XXXXX", virtual_host="/", insist=False) 
chan = conn.channel() 

def work_callback(msg): 
.... 

while True: 
    chan.basic_consume(callback=work_callback, queue='work_queue') 
    try: 
     chan.wait() 
    except KeyboardInterrupt: 
     print "\nExiting cleanly" 
     sys.exit(0) 

我可以看到其他工人正在連接:

$ sudo rabbitmqctl list_queues 
Listing queues ... 
results 0 
work_queue  246 
...done. 

$ sudo rabbitmqctl list_connections 
Listing connections ... 
pipeline  192.168.8.1  41553 running 
pipeline  XX.YY.ZZ.WW  46676 running 
pipeline  192.168.8.4  44482 running 
pipeline  192.168.8.6  41884 running 
...done. 

哪裏XX.YY.ZZ.WW是一個外部IP。 192.168.8.6的工作人員在隊列中攪動,但外部IP上的工作人員在那裏閒置,而有246個消息在隊列中等待其應該等待的時間。

想法?

+0

迷人的。如果兩個「工人」在我提交工作,然後兩個工作。或者甚至更加狡猾,如果只有一個人在提交工作時被連接,就好像所有的工作都安排在那個工人那裏......如果另一個工人連接,它只能工作IFF我殺死原來的工人......然後新的一個人獲得大約一半的工作?如果我重新連接原來的一個,它會得到剩下的一半?離奇! – clemej

+0

我希望將工作放在一個隊列中,並且每個工作人員連接到隊列並一次抓取第一個條目。它看起來相反,它會得到所有的工作。也許我不明白隊列和渠道? – clemej

回答

0

對不起回答我自己的問題,但我終於得到了我一直在尋找的行爲。我需要將QoS prefetch_count設置爲1.顯然,通過不設置prefetch_count,只要一個客戶端連接,所有200+個消息都被傳送到其「通道」並從隊列中排出,因此除非另有其他人看到任何工作原來通過增加工作斷開(從而關閉通道,並把消息發回隊列

:。

chan.basic_qos(0,1,False) 

我的工人,他們現在一次只搶到一個消息不正是我預計會發生,但其現在的工作正如預期的那樣。

相關問題