作爲一個簡單的例子中,我加入5個資料轉移到新的RabbitMQ(V 2.6.1)隊列:RabbitMQ/Pika - 保證以創建的順序收到消息?
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
message = x+1
channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
print " [x] Sent '%s'" % message
connection.close()
予吹掃我的隊列,然後運行上面的代碼添加5項:
[email protected] sports_load_v2$ python send_5.py
[x] Sent '1'
[x] Sent '2'
[x] Sent '3'
[x] Sent '4'
[x] Sent '5'
現在,我試圖模擬失敗的處理。給定以下代碼從隊列中消耗。請注意,我打電話給basic_ack註釋掉了:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()
我運行接收代碼以從隊列中獲取項目。正如我所期待的,我拿到項目#1:
[email protected] sports_load_v2$ python r5.py
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1
由於調用channel.basic_ack()被註釋掉了,我希望未確認的消息被放置在隊列,以便下次消費會得到它。我希望消息#1是隊列中的第一條消息,並將Redelivered屬性設置爲True。相反,消息#2接收:
[email protected] sports_load_v2$ python r5.py
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2
和所有在隊列中的其他消息之前#1回來了重新傳遞的標誌設置爲True接收:
...
[email protected] sports_load_v2$ python r5.py
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5
[email protected] sports_load_v2$ python r5.py
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1
有沒有我可以設置的任何屬性或選項,以便我繼續獲得#1交付,直到它確認?
我的用例是用順序生成的文件加載數據倉庫。我們正在使用基於消息的處理來讓我的程序知道一些新文件已準備好並將被加載到DW中。我們必須按照它們生成的順序來處理這些文件。
編輯:這個問題解決隨着rabbitmq 2.7的發佈。從2.7.0開始,未使用/拒絕的項目將放回到隊列的前面。 –