2016-04-07 41 views
0

我開始使用RabbitMQ,並且在跟隨教程後,我現在試圖按照我需要的方式工作,並且遇到困難。我的設置是我需要能夠首先創建一個RPC,然後根據客戶端將(或不會)發送另一條消息到工作隊列的響應(我不需要響應客戶端)。不幸的是,我努力讓這一切合作似乎沒有按照我想要的方式工作。在服務器端,我有這樣的事情(我已經嘗試了許多變化都具有相同的問題):讓工作隊列和RPC一起工作

var factory = new ConnectionFactory() { HostName = "localhost" }; 
connection = factory.CreateConnection(); 
channel = connection.CreateModel(); 
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true); 

// I started with a named queue, not sure if that's better or worse for this 
var queueName = channel.QueueDeclare().QueueName; 

channel.QueueBind(queue: queueName, 
    exchange: "jobs", 
    routingKey: "saveJob_queue"); 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (model, ea) => 
{ 
    // save stuff that was sent with the saveJob_queue routingKey 
} 

channel.BasicConsume(queue: queueName, 
    noAck: false, 
    consumer: consumer); 

// set up channel for RPC 
// Not sure if this has to have another channel, but it wasn't working on the same channel either 
rpcChannel = connection.CreateModel(); 
var rpcQueueName = rpcChannel.QueueDeclare().QueueName; 

rpcChannel.QueueBind(queue: rpcQueueName, 
    exchange: "jobs", 
    routingKey: "rpc_CheckJob_queue"); 

var rpcConsumer = new EventingBasicConsumer(rpcChannel); 

rpcConsumer.Received += (model, ea) => 
{ 
    // do my remote call and send back a response 
} 

我的問題是,發送到jobs交換與路由鍵rpc_CheckJob_queue仍然消息儘管事實上它應該只接收saveJob_queue路由,但在第一個信道上結束觸發Recieved事件。我可以在該處理程序中檢查ea.RoutingKey,並忽略這些消息,但我不明白他們是如何以及爲什麼最終出現在那裏?

設置連接的正確方法是什麼?以便它可以接收工作隊列消息和RPC消息並正確處理它們?

+0

您是否缺少類似'rpcChannel .BasicConsume(rpcQueueName:queueName,'?,如果您可以發送您的代碼發送郵件 – cantSleepNow

回答

0

所以我放棄了這一點,並決定只在Received事件過濾。我認爲問題在於RabbitMQ在頻道上只有Received事件,但隊列沒有。所以Received事件受到任何方式。所以現在我有這個:

channel.QueueDeclare(queue: queueName, 
     durable: true, 
     exclusive: false, 
     autoDelete: false, 
     arguments: null); 

channel.QueueDeclare(queue: rpcQueueName, 
     durable: false, 
     exclusive: false, 
     autoDelete: false, 
     arguments: null); 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (model, ea) => 
{ 
    switch (ea.RoutingKey) 
    { 
     case queueName: 
      SaveJob(ea); 
      break; 
     case rpcQueueName: 
      CheckJob(ea); 
      break; 
    } 
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
}; 

channel.BasicConsume(queue: queueName, 
    noAck: false, 
    consumer: consumer); 

channel.BasicConsume(queue: rpcQueueName, 
        noAck: false, 
        consumer: consumer); 

我打開更好的建議,因爲這似乎有點關閉。

所以送就是:

var properties = channel.CreateBasicProperties(); 
properties.Persistent = true; 

channel.BasicPublish(exchange: "", 
        routingKey: queueName, 
        basicProperties: properties, 
        body: body); 

的經常性工作任務和:

var corrId = Guid.NewGuid().ToString(); 
var props = channel.CreateBasicProperties(); 
props.ReplyTo = replyQueueName; 
props.CorrelationId = corrId; 

var messageBytes = Encoding.UTF8.GetBytes(msg); 
channel.BasicPublish(exchange: "", 
        routingKey: rpcQueueName, 
        basicProperties: props, 
        body: messageBytes); 

while (true) 
{ 
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
    if (ea.BasicProperties.CorrelationId == corrId) 
    { 
     return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null; 
    } 
} 

對於RPC。

0

由於您沒有爲您的隊列指定名稱,我懷疑您獲得了兩次相同的隊列。所以我認爲發生的事情本質上是這樣的。

工作 - > saveJob_queue - > SomeSystemQueue
工作 - > rpc_CheckJob_queue - > SomeSystemQueue

請嘗試選擇兩個獨立的隊列名稱,然後再次運行代碼。 因此,而不是這樣的:

var queueName = channel.QueueDeclare().QueueName; 

channel.QueueBind(queue: queueName, 
    exchange: "jobs", 
    routingKey: "saveJob_queue"); 

有:

var name = "Queue A"; 
channel.QueueDeclare(name); 
channel.QueueBind(queue: queueName, 
     exchange: "jobs", 
     routingKey: "saveJob_queue"); 

然後別的命名你的第二個隊列的東西和嘗試。

+0

我原來確實有兩個隊列命名(當然不同),但有相同的問題 –

+0

好的,你可能想要切換回去,只是爲了完全排除這種情況。你能不能發佈你的發送邏輯?只是爲了讓每個人都能看到你正在尋址哪個路由密鑰等等。 –