2013-02-28 74 views
2

我想創建一個Python中的RabbitMQ接收器/消費者,我不知道如何檢查消息。我正在嘗試在自己的循環中執行此操作,而不是使用pika中的回調函數。使用Pika客戶端輪詢RabbitMQ消息

如果我理解的東西,在Java客戶端,我可以使用getBasic()來檢查是否有任何消息沒有阻塞。我不介意在獲取消息時阻止,但我不想阻塞,直到有消息。

我沒有找到任何明確的例子,還沒有找到相應的pika中的調用。

回答

1

如果你想同步做到這一點,那麼你就需要看一下鼠兔BlockingConnection

的BlockingConnection上創建的鼠兔的異步 核心providng方法之上的一層,這將阻止,直到他們的預期響應 有回。由於Basic.Deliver和 Basic.Return調用RabbitMQ到您的應用程序,您仍然需要 來實現繼續傳遞樣式異步方法 如果您想要使用basic_consume或 從RabbitMQ接收消息如果您希望在使用 basic_publish時收到傳送失敗的通知。

更多信息,在這裏一個例子

https://pika.readthedocs.org/en/0.9.12/connecting.html#blockingconnection

1

可以使用這個答案Get Queue Size in Pika (AMQP Python)

+0

看來, 'queue.method.message_count'只在'queue_declare()'調用中更新。 雖然還有'get_waiting_message_count()',但在沒有消耗循環的情況下使用時總是返回0。因此,多次調用'queue_declare()'似乎是仍然與API兼容的唯一解決方案。 – Sussch 2016-12-10 20:54:07

+0

也從API中找到'process_data_events()'。 – Sussch 2016-12-10 21:31:52

+1

這是正確的。您必須執行被動隊列聲明才能獲取當前消息計數。 – mike 2016-12-12 09:51:24

1

隊列處理循環可以反覆的幫助下完成的例子定期檢查隊列大小process_data_events()

import pika 

# A stubborn callback that still wants to be in the code. 
def mq_callback(ch, method, properties, body): 
    print(" Received: %r" % body) 

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) 
channel = connection.channel() 
queue_state = channel.queue_declare(queue="test") 
# Configure a callback. 
channel.basic_consume(mq_callback, queue="test") 

try: 
    # My own loop here: 
    while(True): 
     # Do other processing 

     # Process message queue events, returning as soon as possible. 
     # Issues mq_callback() when applicable. 
     connection.process_data_events(time_limit=0) 
finally: 
    connection.close() 
相關問題