2011-11-09 44 views
2

我正在嘗試通讀ActiveMQ消息並根據一些過濾器來處理其中的一些消息或將其他消息保留在隊列中。 我使用NMS API用下面的代碼:在不刪除的情況下讀取ActiveMQ消息

 Uri connecturi = new Uri("activemq:tcp://model.net:61616"); 
     IConnectionFactory factory = new NMSConnectionFactory(connecturi); 
     List<ModelBuilderBase> result = new List<ModelBuilderBase>(); 
     using (IConnection connection = factory.CreateConnection()) 
     using (ISession session = connection.CreateSession()) 
     { 
      IDestination destination = SessionUtil.GetDestination(session, "queue://cidModelbuilderQ"); 
      using (IMessageConsumer consumer = session.CreateConsumer(destination)) 
      { 
       connection.Start(); 
       ITextMessage message; 
       while ((message = consumer.ReceiveNoWait() as ITextMessage) != null) 
       { 
        if (message.Properties[MANDATOR] == null || message.Properties[REFCODE] == null) 
         continue; 
        var mandator = message.Properties[MANDATOR].ToString(); 
        var refCode = message.Properties[REFCODE].ToString(); 
        result.Add(ModelBuilderFactory.Instance.GetInstance(refCode, mandator)); 
       } 
      } 

問題是,在接收到消息後,消息被刪除。我可以以某種方式改變這種行爲,並在成功處理後手動刪除消息嗎? }

回答

3

創建一個QueueBrowser對象來查看指定隊列上的消息。對消息執行邏輯操作,然後創建QueueReceiver對象以接收來自指定隊列的消息。

3

雖然這是不容易寫一個工作代碼,多虧了ARSS答案我現在這個工作的解決方案:

 Uri connecturi = new Uri("activemq:tcp://model.net:61616"); 
     IConnectionFactory factory = new NMSConnectionFactory(connecturi); 
     List<ModelBuilderBase> result = new List<ModelBuilderBase>(); 
     using (IConnection connection = factory.CreateConnection()) 
     using (ISession session = connection.CreateSession()) 
     { 

      IDestination destination = SessionUtil.GetDestination(session, "queue://cidModelbuilderQ"); 
      using (IMessageConsumer consumer = session.CreateConsumer(destination)) 
      { 
       connection.Start(); 
       var q = session.GetQueue("cidModelbuilderQ"); 
       var b = session.CreateBrowser(q); 
       var msgs = b.GetEnumerator(); 
       while (msgs.MoveNext()) 
       { 
        ITextMessage message = msgs.Current as ITextMessage; 
        if (message.Properties[MANDATOR] == null || message.Properties[REFCODE] == null) 
         continue; 
        var mandator = message.Properties[MANDATOR].ToString(); 
        var refCode = message.Properties[REFCODE].ToString(); 
        result.Add(ModelBuilderFactory.Instance.GetInstance(refCode, mandator)); 
       } 
      } 
     } 
相關問題