2015-11-13 148 views
0

我有一個rabbitmq消費者應用程序實現「.net中的發佈/訂閱模式,它完美地作爲控制檯應用程序運行,但是當我將它部署爲Windows服務時,它似乎不會保存數據到MongoDB的。RabbitMQ消費者作爲Windows服務

protected override void OnStart(string[] args) 
    { 
     try 
     { 
      var connectionString = "mongodb://localhost"; 
      var client = new MongoClient(connectionString); 
      var factory = new ConnectionFactory() { HostName = "localhost" };    
      using (var connection = factory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        channel.ExchangeDeclare(exchange: "test", type: "fanout"); 
        var queueName = channel.QueueDeclare().QueueName; 
        channel.QueueBind(queue: queueName,          exchange: "logs", routingKey: ""); 

        var consumer = new EventingBasicConsumer(channel); 
        consumer.Received += (model, ea) => 
        { 
         var body = ea.Body; 
         var message = Encoding.UTF8.GetString(body); 
         BsonDocument document = BsonDocument.Parse(message); 
         var database = client.GetDatabase("test"); 
         var collection = database.GetCollection<BsonDocument>("test_collection"); 
         collection.InsertOneAsync(document); 
        }; 
        channel.BasicConsume(queue: queueName,          noAck: true,consumer: consumer); 

       } 
      } 
     } 
     catch (Exception ex) 
     { 
      throw; 
     } 
    } 

有什麼我失蹤?

+0

你檢查了日誌嗎? – Gabriele

+0

你不是在等待InsertOneAsync的結果......任何事情都可能發生,你永遠不會知道......使用collection.InsertOneAsync(document).GetAwaiter()。GetResult(); –

+0

@Gabriele我確實嘗試了日誌記錄以查看消息是否真的被接收。但看起來不像。 – sandy

回答

3

在OnStart()中忙於等待是一個糟糕的主意,因爲操作系統會期待從它返回。在這裏閱讀:https://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx

編輯:上面的代碼的問題是,你有你的連接和頻道在using語句。這樣做的全部意義在於將它們解決一次,超出範圍。因此,在這種情況下,即使您添加了事件處理程序,您在退出範圍並處理通道等操作之後不久,要解決此問題,請將連接,通道和使用者從「OnStart」方法中拉出並讓他們成爲班級(可能是私人)成員。即使您退出該方法並且您的活動應該繼續收聽,也應該保持其打開狀態。

0

做以下修改到我的OnStart方法奏效了

protected override void OnStart(string[] args) 
    { 

     ConnectionFactory factory = new ConnectionFactory { HostName = localhost" }; 
     var connectionString = "mongodb://localhost"; 
     var client = new MongoClient(connectionString); 


     using (IConnection connection = factory.CreateConnection()) 
     { 
      using (IModel channel = connection.CreateModel()) 
      { 
       channel.ExchangeDeclare(exchange: "test", type: "fanout"); 

       string queueName = channel.QueueDeclare(); 

       channel.QueueBind(queueName, "test", ""); 

       this.EventLog.WriteEntry("Waiting for messages"); 

       QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
       channel.BasicConsume(queueName, true, consumer); 

       while (true) 
       { 
        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
        var message = Encoding.UTF8.GetString(e.Body); 
        BsonDocument document = BsonDocument.Parse(message); 
        var database = client.GetDatabase("test"); 
        var collection = database.GetCollection<BsonDocument>("test_collection"); 
        collection.InsertOneAsync(document); 

       } 
      } 
     } 
    } 
+1

此服務如何處理停止?即它會停止嗎?是否有必要殺死它,因爲a)while循環沒有條件,並且b)調用consumer.Queue.Dequeue()而不是阻塞? – dabs