2014-07-08 62 views
0

我已經註冊爲消息隊列處理程序 ServiceStack服務。當服務處理消息並遇到錯誤時,我想從隊列中提取剩餘的消息並將它們發送到不同的隊列。這是我正在嘗試使用的代碼。使用ServiceStack和RabbitMQ的從一個隊列將消息發送到另一個

if (error) 
{ 
    using (var mqClient = TryResolve<IMessageFactory>().CreateMessageQueueClient()) 
    { 
     var callMessage = mqClient.Get<CallMessage>(QueueNames<CallMessage>.In); 
     while (callMessage != null) 
     { 
     mqClient.Ack(callMessage); 
     PublishMessage(new TextMessage { Text = callMessage.Text }); 

     // Get the next message. Null is returned when the queue is empty 
     callMessage = mqClient.Get<CallMessage>(QueueNames<CallMessage>.In); 
     } 
    } 
} 

然而,在調用mqClient.Get的CallMessage隊列似乎變得有活性UNACKED消息陷入僵局。

enter image description here

什麼是從隊列中提取消息,並將它們重新發布到不同的隊列中的正確方法?

回答

1

您想改爲使用IMessageQueueClient.GetAsync從隊列中檢索消息,該消息將返回下一條消息,如果沒有更多消息未決,則返回null

IMessageQueueClient.Get是一個同步阻塞獲取,將阻塞,直到它收到一條消息或可選的超時已過,如果沒有給出超時它將永遠阻塞,直到它收到一條消息。

+0

謝謝@mythz使用GetAsync解決了與獲得阻塞的問題,但它總是返回null。我發現隊列中的所有消息已經在RabbitMq的'Unacked'狀態中出列,所以空結果是有意義的。 ServiceStack是否將消息作爲批處理出隊?我註冊使用handler 「mqServer.RegisterHandler (ServiceController.ExecuteMessage);」 – jacksonakj

+0

@jacksonakj它僅在從底層C#的RabbitMQ客戶端時間拉出來1,但它確實指定[20 prefetchCount](https://github.com/ServiceStack/ServiceStack/blob/master/src/ServiceStack。 RabbitMq/RabbitMqProducer.cs#L40)。 – mythz

相關問題