2017-02-14 82 views
0

。我用EventingBasicConsumer交易所QueueBind。爲了測試,我運行我的應用程序以接收消息,然後關閉我的以太網控制器。之後,我打開receaving messges再次開始,但約500條消息是丟失。 我的代碼:的RabbitMQ EventingBasicConsumer失去我正在開發使用兔MQ消息應用程序的消息

private void DoWork() 
    { 
     try 
     { 
      var connection = ConnectionFactory.CreateConnection(); 
      IModel model = connection.CreateModel(); 

      // Configure the Quality of service for the model. Below is how what each setting means. 
      // BasicQos(0="Dont send me a new message untill I’ve finshed", 1= "Send me one message at a time", false ="Apply to this Model only") 
      model.BasicQos(0, 1, false); 

      model.ExchangeDeclare(Options.RabbitConnectionOptions.Exchange, RabbitConstants.ExchangeType, RabbitConstants.ExchangeDurable, RabbitConstants.ExchangeAutoDelete); 
      var queueDeclareOk = model.QueueDeclare("SomeSubsruberQueue3", RabbitConstants.QueueDurable, RabbitConstants.QueueExclusive, RabbitConstants.ExchangeAutoDelete); 


      var queueName = queueDeclareOk.QueueName; 

      foreach (var optionsBindingKey in Options.BindingKeys) 
      { 
       Logger.Debug($"QueueBind for {nameof(optionsBindingKey)}={optionsBindingKey}"); 
       model.QueueBind(queueName, Options.RabbitConnectionOptions.Exchange, optionsBindingKey); 
      } 


      var consumer = new EventingBasicConsumer(model); 

      consumer.Received += (ch, ea) => 
      { 
       try 
       { 
        var body = ea.Body; 
        var message = Encoding.UTF8.GetString(body); 
        var routingKey = ea.RoutingKey; 
        var messageToLog = $" [x] Received '{routingKey}':'{message}'";       

        Logger.Info(messageToLog); 
        Console.WriteLine($"receavedCount={++receavedCount}"); 

       } 
       catch (Exception e) 
       { 
        Console.WriteLine(e); 
        throw; 
       } 
      }; 

      model.BasicConsume(queueName, RabbitConstants.QueueAutoAck, consumer); 
     } 
     catch (Exception e) 
     { 
      Logger.Error(e); 
      Console.WriteLine(e); 
      throw; 
     } 
    } 

如何避免失去消息?

附:這裏的日誌顯示丟失的消息。 「ID」 是連續的,這樣你們可以看到,有ID的按摩,從926到1299丟失

2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="922" Name="1c" /> 
2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="923" Name="1c" /> 
2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="924" Name="1c" /> 
2017-02-14 10:52:01.5056 <Root><Subscribers><Subscriber Id="925" Name="1c" /> 
2017-02-14 10:52:22.5606 <Root><Subscribers><Subscriber Id="1300" Name="1c" /> 
2017-02-14 10:52:22.5606 <Root><Subscribers><Subscriber Id="1301" Name="1c" /> 
2017-02-14 10:52:22.5676 <Root><Subscribers><Subscriber Id="1302" Name="1c" /> 
2017-02-14 10:52:22.6046 <Root><Subscribers><Subscriber Id="1303" Name="1c" /> 

UPD:

如果我修改:

model.BasicConsume(queueName,true, consumer); 

model.BasicConsume(queueName,false, consumer); 

並使用明確的ASK

((EventingBasicConsumer)ch).Model.BasicAck(ea.DeliveryTag, false); 

我有很奇怪的行爲:

2017-02-14 13:06:35.9835| DeliveryTag=23, <Subscriber Id="778" Name="1c" /> 
2017-02-14 13:06:36.0295| DeliveryTag=24, <Subscriber Id="779" Name="1c" /> 
2017-02-14 13:06:57.3285| DeliveryTag=26, <Subscriber Id="782" Name="1c" /> 
2017-02-14 13:06:57.3755| DeliveryTag=27, <Subscriber Id="783" Name="1c" /> 

沒有 DeliveryTag = 25!

+0

鬆散=它鬆鬆垮垮,失去=丟失..... – BugFinder

回答

2

請勿使用自動應答。閱讀一條消息,做任何你想做的事情,然後當你完成它時明確地確認。

0

Ок,感謝埃米爾Vikström,我必須用明確的ASK:

model.BasicConsume(queueName,false, consumer); 

和處理消息後,可能會問:

((EventingBasicConsumer)ch).Model.BasicAck(ea.DeliveryTag, false); 

有關丟失信息的其他問題:它返回時的RabbitMQ重新排序消息隊列。所以它不是洛杉磯,它只是稍後會收回