2015-07-12 51 views
11

我想知道爲什麼我的RabbitMQ RPC-客戶端在重啓後總是處理死信息。 _channel.QueueDeclare(queue, false, false, false, null);應該禁用緩衝區。如果我在RPC客戶端內部過載了QueueDeclare,我無法連接到服務器。這裏有什麼不對嗎?任何想法如何解決這個問題?RabbitMQ持久隊列不起作用(RPC-Server,RPC-Client)


RPC-服務器

new Thread(() => 
{ 
    var factory = new ConnectionFactory { HostName = _hostname }; 
    if (_port > 0) 
     factory.Port = _port; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 

    _channel.QueueDeclare(queue, false, false, false, null); 
    _channel.BasicQos(0, 1, false); 
    var consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(queue, false, consumer); 
    IsRunning = true; 
    while (IsRunning) 
    { 
     BasicDeliverEventArgs ea; 
     try { 
      ea = consumer.Queue.Dequeue(); 
     } 
     catch (Exception ex) { 
      IsRunning = false; 
     } 
     var body = ea.Body; 
     var props = ea.BasicProperties; 
     var replyProps = _channel.CreateBasicProperties(); 
     replyProps.CorrelationId = props.CorrelationId; 

     var xmlRequest = Encoding.UTF8.GetString(body); 

     var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message; 
     var messageResponse = handler(messageRequest); 

     _channel.BasicPublish("", props.ReplyTo, replyProps, 
           messageResponse); 
     _channel.BasicAck(ea.DeliveryTag, false); 
    } 
}).Start(); 

RPC客戶端

public void Start() 
{ 
    if (IsRunning) 
     return; 
    var factory = new ConnectionFactory { 
     HostName = _hostname, 
     Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint) 
           : new AmqpTcpEndpoint(_endpoint, _port) 
    }; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 
    _replyQueueName = _channel.QueueDeclare(); // Do not connect any more 
    _consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(_replyQueueName, true, _consumer); 
    IsRunning = true; 
} 

public Message Call(Message message) 
{ 
    if (!IsRunning) 
     throw new Exception("Connection is not open."); 
    var corrId = Guid.NewGuid().ToString().Replace("-", ""); 
    var props = _channel.CreateBasicProperties(); 
    props.ReplyTo = _replyQueueName; 
    props.CorrelationId = corrId; 

    if (!String.IsNullOrEmpty(_application)) 
     props.AppId = _application; 

    message.InitializeProperties(_hostname, _nodeId, _uniqueId, props); 

    var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message)); 
    _channel.BasicPublish("", _queue, props, messageBytes); 

    try 
    { 
     while (IsRunning) 
     { 
      var ea = _consumer.Queue.Dequeue(); 
      if (ea.BasicProperties.CorrelationId == corrId) 
      { 
       var xmlResponse = Encoding.UTF8.GetString(ea.Body); 
       try 
       { 
        return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message; 
       } 
       catch(Exception ex) 
       { 
        IsRunning = false; 
        return null; 
       } 
      } 
     } 
    } 
    catch (EndOfStreamException ex) 
    { 
     IsRunning = false; 
     return null; 
    } 
    return null; 
} 

回答

6

嘗試DeliveryMode屬性設置爲您的RPC的非持久性(1)客戶端代碼如下:

public Message Call(Message message) 
{ 
    ... 
    var props = _channel.CreateBasicProperties(); 
    props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well 
    ... 
} 

AMQP Model Explained包含非常有用的資源,例如解釋如何處理以死信隊列結束的消息。

從文檔中另一個有用注意到關於排隊耐久性:

耐用隊列保存在磁盤上,因而倖免代理重新啓動。 不耐用的隊列稱爲瞬態。並非所有場景 和用例強制隊列都是持久的。

隊列的持久性不會使路由到該 隊列的消息持久。如果代理被取下然後重新啓動, 持久性隊列將在代理啓動期間重新聲明,但是,只有 持久性消息將被恢復。

注意,它談論經紀人重啓沒有出版商或消費者的重新啓動。

+0

幫忙@ MR.ABC嗎? – 2015-07-20 14:17:15