2011-04-01 66 views
0

假設我有一個名爲notifications的隊列,並且發送1000條消息到該隊列。我也有20位消費者。我希望每個消費者從隊列中取出一條消息,對其進行處理,然後獲取下一條可用消息。ActiveMQ - 消費者不共享負載

現在發生的情況是,一些消費者抓住了大量的消息並對其進行處理,而其他消費者什麼也不做。

下面是一個完整的測試用例,演示了我所看到的。在現實中,消費者在不同計算機上的所有獨立的過程,但是這正是複製行爲:

using System; 
using System.Threading; 
using Apache.NMS; 
using Apache.NMS.ActiveMQ; 
using Spring.Messaging.Nms.Core; 

namespace QueueTest 
{ 
    class Program 
    { 
     private static object _syncObj = new object(); 
     private static int _row; 

     static void Main() 
     { 
      var connectionFactory = new ConnectionFactory("tcp://localhost:61616"); 
      var template = new NmsTemplate(connectionFactory); 

      for (int i = 0; i < 500; i++) 
      { 
       template.ConvertAndSend("notifications", "hello"); 
      } 

      for (int i = 0; i < 10; i++) 
      { 
       ThreadPool.QueueUserWorkItem(Spawn, i); 
      } 

      Console.ReadKey(); 
     } 

     static void Spawn(object o) 
     { 
      int count = 0; 
      int threadId = (int)o; 

      var ts = new TimeSpan(0,0,0,5); 
      while (true) 
      { 
       var connectionFactory = new ConnectionFactory(
           "tcp://localhost:61616" 
           + "?nms.PrefetchPolicy.QueuePrefetch=1", 
           String.Format("{0}:{1}:{2}", 
       Environment.MachineName, 
       threadId, 
       "notifications")); 
       using (var conn = connectionFactory.CreateConnection()) 
       { 
        conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications"); 
        conn.Start(); 

        using (var session = conn.CreateSession()) 
        { 
         var queue = session.GetQueue("notifications"); 
         using (var consumer = session.CreateConsumer(queue)) 
         { 
          IMessage msg = consumer.Receive(ts); 
          while (msg != null) 
          { 
           lock (_syncObj) 
           { 
            Interlocked.Increment(ref _row); 
            Console.WriteLine("{0}. {1} processed {2}", _row, threadId, ++count); 
            if (_row == 500) 
            { 
             Environment.Exit(0); 
            } 
           } 
           Thread.Sleep(1000); 
           msg = consumer.Receive(ts); 
          } 
          Console.WriteLine("{0} nothing to process", threadId); 
         } 
        } 
        conn.Stop(); 
       } 
      } 
     } 
    } 
} 

我如何得到它更均勻地攤開消息發送給所有的消費者?

回答

0

我設置的賞金,我想通了問題... :-(

我使用NMS的舊版本不支持預取的政策。

1

該問題圍繞着消費者的預取緩衝區。默認情況下,使用者將從代理預取大約1000條消息。您需要爲消費者設置預取,以便他們允許其他消費者獲取一些消息,在這種情況下,您可能需要將預取設置爲一個,以便每位消費者均勻分擔負載。看看NMS.ActiveMQ API中的預取策略。

你可以設置連接URI預讀,它看起來像下面TCP://本地主機:61616 nms.PrefetchPolicy.QueuePrefetch = 1

右後
+0

我試了一下prefetchPolicy的36種= 1,包括你那裏,啥子consumer.prefetchPolicy = 1和consumer.prefetchPolicy = 0,jms.prefetchPolicy = 1等。它們都表現出相同的行爲:第一個XX消費者數量提供所有的消息。 – ConsultUtah 2011-04-05 22:39:21

+0

看起來您忽略了.QueuePrefetch部分,您無法設置整個策略,查看src/test/csharp文件夾中的NMSConnectionFactoryTest.cs文件,它具有設置預取限制的測試用例。在v1.5.1版本中有一個更新,允許您使用類似下面的方式設置策略中的所有內容:nms.PrefetchPolicy.All = 1 – 2011-04-06 11:41:11