我想創建一個Python中的RabbitMQ接收器/消費者,我不知道如何檢查消息。我正在嘗試在自己的循環中執行此操作,而不是使用pika中的回調函數。使用Pika客戶端輪詢RabbitMQ消息
如果我理解的東西,在Java客戶端,我可以使用getBasic()
來檢查是否有任何消息沒有阻塞。我不介意在獲取消息時阻止,但我不想阻塞,直到有消息。
我沒有找到任何明確的例子,還沒有找到相應的pika中的調用。
我想創建一個Python中的RabbitMQ接收器/消費者,我不知道如何檢查消息。我正在嘗試在自己的循環中執行此操作,而不是使用pika中的回調函數。使用Pika客戶端輪詢RabbitMQ消息
如果我理解的東西,在Java客戶端,我可以使用getBasic()
來檢查是否有任何消息沒有阻塞。我不介意在獲取消息時阻止,但我不想阻塞,直到有消息。
我沒有找到任何明確的例子,還沒有找到相應的pika中的調用。
如果你想同步做到這一點,那麼你就需要看一下鼠兔BlockingConnection
的BlockingConnection上創建的鼠兔的異步 核心providng方法之上的一層,這將阻止,直到他們的預期響應 有回。由於Basic.Deliver和 Basic.Return調用RabbitMQ到您的應用程序,您仍然需要 來實現繼續傳遞樣式異步方法 如果您想要使用basic_consume或 從RabbitMQ接收消息如果您希望在使用 basic_publish時收到傳送失敗的通知。
更多信息,在這裏一個例子
https://pika.readthedocs.org/en/0.9.12/connecting.html#blockingconnection
隊列處理循環可以反覆的幫助下完成的例子定期檢查隊列大小process_data_events()
:
import pika
# A stubborn callback that still wants to be in the code.
def mq_callback(ch, method, properties, body):
print(" Received: %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
queue_state = channel.queue_declare(queue="test")
# Configure a callback.
channel.basic_consume(mq_callback, queue="test")
try:
# My own loop here:
while(True):
# Do other processing
# Process message queue events, returning as soon as possible.
# Issues mq_callback() when applicable.
connection.process_data_events(time_limit=0)
finally:
connection.close()
看來, 'queue.method.message_count'只在'queue_declare()'調用中更新。 雖然還有'get_waiting_message_count()',但在沒有消耗循環的情況下使用時總是返回0。因此,多次調用'queue_declare()'似乎是仍然與API兼容的唯一解決方案。 – Sussch 2016-12-10 20:54:07
也從API中找到'process_data_events()'。 – Sussch 2016-12-10 21:31:52
這是正確的。您必須執行被動隊列聲明才能獲取當前消息計數。 – mike 2016-12-12 09:51:24