2010-05-10 28 views
6

我想發送消息到RabbitMQ服務器,然後等待回覆消息(在「回覆」隊列中)。當然,我不想永遠等待處理這些消息的應用程序停機 - 這需要超時。這聽起來像是一項非常基本的任務,但我找不到辦法做到這一點。我現在遇到了這個問題py-amqplibRabbitMQ .NET client等待一個超時的單個RabbitMQ消息

到目前爲止,我已經得到了最好的解決方案是使用basic_get在兩者之間sleep輪詢,但是這是很醜陋:

def _wait_for_message_with_timeout(channel, queue_name, timeout): 
    slept = 0 
    sleep_interval = 0.1 

    while slept < timeout: 
     reply = channel.basic_get(queue_name) 
     if reply is not None: 
      return reply 

     time.sleep(sleep_interval) 
     slept += sleep_interval 

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout) 

肯定有一些更好的辦法?

回答

8

我剛剛在carrot中增加了對amqplib的超時支持。

這是amqplib.client0_8.Connection一個子類:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi能夠接收關於信道的任意數量的 一個版本的channel.wait

我想這可能會合並上遊在某些時候。

+1

現在,我稱之爲「偉大的答案」:「它是固定的」!接受 - 希望它*被合併到amqplib中。 – EMP 2010-05-10 23:08:27

+0

@EMP哈哈:)有趣:) – 2013-07-18 10:51:04

1

這似乎打破了異步處理的整個思路,但是如果你肯定我認爲正確的方法是使用RpcClient

+0

雖然RpcClient本身對我來說並不實用,看着它的實現揭示了使用方法:創建一個'QueueingBasicConsumer'和在隊列中等待,這支持超時。這在.NET中並不像我擔心的那樣複雜。 – EMP 2010-05-10 00:57:13

2

有一個例子here使用qpidmsg = q.get(timeout=1)應該做你想做的。對不起,我不知道其他AMQP客戶端庫實現超時(特別是我不知道你提到的兩個具體的)。

+0

看看qpid的源代碼,它似乎使用與.NET客戶端完全相同的方法:'basic_consume'帶有隊列並在隊列中等待超時。看起來這就是我必須要做的。 – EMP 2010-05-10 00:57:57

8

這裏就是我終於實現了在.NET客戶端:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) 
{ 
    var consumer = new QueueingBasicConsumer(Channel); 
    var tag = Channel.BasicConsume(queueName, true, null, consumer); 
    try 
    { 
     object result; 
     if (!consumer.Queue.Dequeue(timeoutMs, out result)) 
      throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs/1000.0)); 

     return ((BasicDeliverEventArgs)result).Body; 
    } 
    finally 
    { 
     Channel.BasicCancel(tag); 
    } 
} 

不幸的是,我不能做同樣的PY-amqplib,因爲它basic_consume方法不調用回調,除非你打電話channel.wait()channel.wait()不支持超時!這個愚蠢的限制(我不斷遇到)意味着如果你永遠不會收到另一條消息,你的線程將永遠凍結。

1

兔子現在允許您添加超時事件。簡單地包裹你的代碼在嘗試捕捉,然後扔在超時異常和斷開處理程序:

try{ 
    using (IModel channel = rabbitConnection.connection.CreateModel()) 
    { 
     client = new SimpleRpcClient(channel, "", "", queue); 
     client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity 
     client.TimedOut += RpcTimedOutHandler; 
     client.Disconnected += RpcDisconnectedHandler; 
     byte[] replyMessageBytes = client.Call(message); 
     return replyMessageBytes; 
    } 
} 
catch (Exception){ 
    //Handle timeout and disconnect here 
} 
private void RpcDisconnectedHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC disconnect exception occured."); 
} 

private void RpcTimedOutHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC timeout exception occured."); 
}