2012-05-14 76 views
0

作爲一個簡單的例子中,我加入5個資料轉移到新的RabbitMQ(V 2.6.1)隊列:RabbitMQ/Pika - 保證以創建的順序收到消息?

import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net')) 
channel = connection.channel() 
channel.queue_declare(queue='dw.neil',durable=True) 
# add 5 messages to the queue, the numbers 1-5 
for x in range(5): 
    message = x+1 
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message)) 
    print " [x] Sent '%s'" % message 
connection.close() 

予吹掃我的隊列,然後運行上面的代碼添加5項:

[email protected] sports_load_v2$ python send_5.py 
[x] Sent '1' 
[x] Sent '2' 
[x] Sent '3' 
[x] Sent '4' 
[x] Sent '5' 

現在,我試圖模擬失敗的處理。給定以下代碼從隊列中消耗。請注意,我打電話給basic_ack註釋掉了:

#!/usr/bin/env python 
import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net')) 
channel = connection.channel() 
channel.queue_declare(queue='dw.neil',durable=True) 
method_frame, header_frame, body=channel.basic_get(queue='dw.neil') 
print method_frame, header_frame 
print "body: %s" % body 
#channel.basic_ack(delivery_tag=method_frame.delivery_tag) 
connection.close() 

我運行接收代碼以從隊列中獲取項目。正如我所期待的,我拿到項目#1:

[email protected] sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])> 
<BasicProperties([])> 
body: 1 

由於調用channel.basic_ack()被註釋掉了,我希望未確認的消息被放置在隊列,以便下次消費會得到它。我希望消息#1是隊列中的第一條消息,並將Redelivered屬性設置爲True。相反,消息#2接收:

[email protected] sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])> 
<BasicProperties([])> 
body: 2 

和所有在隊列中的其他消息之前#1回來了重新傳遞的標誌設置爲True接收:

...

[email protected] sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])> 
<BasicProperties([])> 
body: 5 

[email protected] sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])> 
<BasicProperties([])> 
body: 1 

有沒有我可以設置的任何屬性或選項,以便我繼續獲得#1交付,直到它確認?

我的用例是用順序生成的文件加載數據倉庫。我們正在使用基於消息的處理來讓我的程序知道一些新文件已準備好並將被加載到DW中。我們必須按照它們生成的順序來處理這些文件。

+0

編輯:這個問題解決隨着rabbitmq 2.7的發佈。從2.7.0開始,未使用/拒絕的項目將放回到隊列的前面。 –

回答

3

解釋這在RabbitMQ的2.7.0得到解決 - 我們運行2.6.1。

release notes

此版本中的新功能包括:

  • 爲了保存消息重新排隊的消費者
1

嘗試使用channel.basic_reject - 這應該將未確認的消息推送回RabbitMQ,該消息將把消息視爲新消息。另外 - 如果你有一個失敗的消息卡住,你可以使用channel.basic_recover告訴RabbitMQ重新發送所有未確認的消息。

http://www.rabbitmq.com/extensions.html#negative-acknowledgements提供Basic.Reject VS Basic.Nack區別信息。

消息排序語義在http://www.rabbitmq.com/semantics.html

相關問題