2
我在.net中使用RabbitMQ,當我在隊列中放置100條消息時,看到一個奇怪的問題。它處理約50條消息,然後Dequeue()
方法只是掛起。如果我重新啓動服務,它會處理剩餘的項目。RabbitMQ只處理50條消息,然後阻止
編輯:它正在處理完全50%的隊列。當我添加1000條消息時,它只處理500條。即使當單線程時
我在這裏丟失了什麼?
private void InitializeAgent() {
var agentFactory = new ConnectionFactory() { HostName = "localhost" };
agentConnection = agentFactory.CreateConnection();
agentChannel = agentConnection.CreateModel();
var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null);
consumer = new QueueingBasicConsumer(agentChannel);
agentChannel.BasicConsume(GetType().Name, false, consumer);
}
public void DequeueMessages() {
ThreadPool.SetMaxThreads(200, 200);
ThreadPool.SetMinThreads(200, 200);
var ea = consumer.Queue.Dequeue();
ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}
public void AgentTask() {
var instance = factory.GetInstance(threadItem);
while (true)
DequeueMessages();
}
private void ProcessWorkInThread(object state) {
var ea = state as BasicDeliverEventArgs;
var message = Encoding.UTF8.GetString(ea.Body);
var settings = new JsonSerializerSettings();
settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public };
var item = JsonConvert.DeserializeObject<TEntity>(message, settings);
Thread.Sleep(10000) //simulate work
lock (agentChannel)
agentChannel.BasicAck(ea.DeliveryTag, false);
}
相當確定IModel不是線程安全的。 .net用戶指南明確指出,不應該在線程之間共享IModel。 – user1450877 2014-10-31 14:50:31
@ user1450877我可以在dequeue&ack周圍添加鎖嗎? – 2014-10-31 15:01:35
必須爲線程使用一個IModel。 – Gabriele 2014-10-31 15:10:15