有很多方法可以做到這一點。例如,您可以用ManualResetEvent
一起使用EventingBasicConsumer
,像這樣的(這只是用於演示目的 - 更好地利用以下方法之一):
var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
// setup signal
using (var signal = new ManualResetEvent(false)) {
var consumer = new EventingBasicConsumer(channel);
byte[] messageBody = null;
consumer.Received += (sender, args) => {
messageBody = args.Body;
// process your message or store for later
// set signal
signal.Set();
};
// start consuming
channel.BasicConsume("your.queue", false, consumer);
// wait until message is received or timeout reached
bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
// cancel subscription
channel.BasicCancel(consumer.ConsumerTag);
if (timeout) {
// timeout reached - do what you need in this case
throw new Exception("timeout");
}
// at this point messageBody is received
}
}
}
正如你在註釋中規定 - 如果你希望在同一個隊列多個消息,這不是最好的方法。那麼這不是最好的方法,我只是爲了演示ManualResetEvent
在案例庫本身不提供超時支持的情況下使用它。
如果您正在進行RPC(遠程過程調用,請求 - 答覆) - 您可以在服務器端使用SimpleRpcClient
和SimpleRpcServer
。客戶端看起來就像這樣:
var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
// do something on timeout
};
var reply = client.Call(myMessage); // will return reply or null if timeout reached
更簡單的方法:使用基本Subscription
類(它使用內部同樣EventingBasicConsumer
,但支持超時,所以你不必自己實現),像這樣:
var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
// timeout
}
來源
2017-05-31 10:04:58
Evk
第一個解決方案無效。 BasicConsume不能保證在BasicCancel上停止使用,它可以稍後做這個,因爲它實際上是在執行rabbit(只是嘗試每個請求使用一條消息,並且在某些情況下,您會看到幾次分配messageBody)。您仍然需要重發多餘的消息。對於第二個和第三個,我會嘗試現在=) – eocron
雖然你的上半部分是無關的,訂閱類正是我想要的!謝謝,它的作品奇妙!你能編輯答案,以便其他人知道最後一個解決了嗎? – eocron
不過,它通過實現存儲了一堆消息,而我只需要一個=/ – eocron