1
下面是一個簡單的.net核心1.1控制檯應用程序。使用-r參數調用它,它讀取rabbitmq隊列中的所有消息,用任何其他參數調用它,並將每個參數作爲消息排隊。.NET Core 1.1接收RabbitMQ消息
下面是問題所在,我可以將消息排入隊列,但所有嘗試讀取消息都不會導致消息被讀取。顯然,我沒有正確使用隊列,並希望得到一些指導。
謝謝!
using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMqDemo
{
class Program
{
static void Main(string[] args)
{
var client = new MessagingClient();
if (args.Length == 1 && args[0].ToLower() == "-r")
{
Console.WriteLine("Reading Messages from Queue.");
var messages = client.ReceiveMessages();
Console.WriteLine($"Read {messages.Length} message(s) from queue.");
foreach(var msg in messages)
Console.WriteLine(msg);
}
else
{
foreach (var msg in args)
{
client.SendMessage(msg);
}
Console.WriteLine($"Enqueued {args.Length} Message.");
}
}
}
internal class MessagingClient
{
private readonly ConnectionFactory connectionFactory;
private string ExchangeName => "defaultExchange";
private string RoutingKey => "";
private string QueueName => "Demo";
private string HostName => "localhost";
public MessagingClient()
{
this.connectionFactory = new ConnectionFactory {HostName = this.HostName};
}
public void SendMessage(string message)
{
using (var connection = this.connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
this.QueueDeclare(channel, this.QueueName);
var properties = this.SetMessageProperties(channel, message);
string messageJson = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(messageJson);
channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body);
}
}
}
public string[] ReceiveMessages()
{
var messages = new List<string>();
using (var connection = this.connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
this.QueueDeclare(channel, this.QueueName);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string bodystring = Encoding.UTF8.GetString(ea.Body);
messages.Add(bodystring);
// ReSharper disable once AccessToDisposedClosure
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer);
}
}
return messages.ToArray();
}
private void QueueDeclare(IModel channel, string queueName)
{
channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
var queueDeclared = channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queueName, ExchangeName, RoutingKey);
}
private IBasicProperties SetMessageProperties(IModel channel, object message)
{
var properties = channel.CreateBasicProperties();
properties.ContentType = "application/json";
properties.Persistent = true;
return properties;
}
}
}
男人讓我感到愚蠢。當然,它不會阻止正在執行的線程。我添加了一些阻止邏輯,它像一個冠軍。謝謝盧克! – cdarrigo
很高興我能幫到你 –