假設我有一個名爲notifications的隊列,並且發送1000條消息到該隊列。我也有20位消費者。我希望每個消費者從隊列中取出一條消息,對其進行處理,然後獲取下一條可用消息。ActiveMQ - 消費者不共享負載
現在發生的情況是,一些消費者抓住了大量的消息並對其進行處理,而其他消費者什麼也不做。
下面是一個完整的測試用例,演示了我所看到的。在現實中,消費者在不同計算機上的所有獨立的過程,但是這正是複製行爲:
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;
namespace QueueTest
{
class Program
{
private static object _syncObj = new object();
private static int _row;
static void Main()
{
var connectionFactory = new ConnectionFactory("tcp://localhost:61616");
var template = new NmsTemplate(connectionFactory);
for (int i = 0; i < 500; i++)
{
template.ConvertAndSend("notifications", "hello");
}
for (int i = 0; i < 10; i++)
{
ThreadPool.QueueUserWorkItem(Spawn, i);
}
Console.ReadKey();
}
static void Spawn(object o)
{
int count = 0;
int threadId = (int)o;
var ts = new TimeSpan(0,0,0,5);
while (true)
{
var connectionFactory = new ConnectionFactory(
"tcp://localhost:61616"
+ "?nms.PrefetchPolicy.QueuePrefetch=1",
String.Format("{0}:{1}:{2}",
Environment.MachineName,
threadId,
"notifications"));
using (var conn = connectionFactory.CreateConnection())
{
conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications");
conn.Start();
using (var session = conn.CreateSession())
{
var queue = session.GetQueue("notifications");
using (var consumer = session.CreateConsumer(queue))
{
IMessage msg = consumer.Receive(ts);
while (msg != null)
{
lock (_syncObj)
{
Interlocked.Increment(ref _row);
Console.WriteLine("{0}. {1} processed {2}", _row, threadId, ++count);
if (_row == 500)
{
Environment.Exit(0);
}
}
Thread.Sleep(1000);
msg = consumer.Receive(ts);
}
Console.WriteLine("{0} nothing to process", threadId);
}
}
conn.Stop();
}
}
}
}
}
我如何得到它更均勻地攤開消息發送給所有的消費者?
我試了一下prefetchPolicy的36種= 1,包括你那裏,啥子consumer.prefetchPolicy = 1和consumer.prefetchPolicy = 0,jms.prefetchPolicy = 1等。它們都表現出相同的行爲:第一個XX消費者數量提供所有的消息。 – ConsultUtah 2011-04-05 22:39:21
看起來您忽略了.QueuePrefetch部分,您無法設置整個策略,查看src/test/csharp文件夾中的NMSConnectionFactoryTest.cs文件,它具有設置預取限制的測試用例。在v1.5.1版本中有一個更新,允許您使用類似下面的方式設置策略中的所有內容:nms.PrefetchPolicy.All = 1 – 2011-04-06 11:41:11