我對某些pub/sub(發佈者/訂閱者)代碼使用.Net RabbitMQ。一切正常,直到我開始關閉消費者。消費者正確處理已發佈的數據,直到關閉最後一位消費者。畢竟所有的消費者,我開了一個新的消費者,但沒有任何反應。應用程序將打開,但它不會從發佈者處收到任何數據。RabbitMQ Pub/Sub:關閉最後一位消費者關閉發行商的頻道/型號。爲什麼?
我檢查了發佈者代碼,發現當最後一位消費者關閉時,其頻道的IsOpen屬性變爲false。即使消費者關閉後,我也不知道是否有一些設置可以保持渠道暢通。
這是我的發佈者代碼: 編輯我原來粘貼了錯誤的代碼。
這裏是我的消費者代碼:
public MyConsumer
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly Timer _timer;
private SubscriptionConsumerType(string ipAddress, string exchangeName, TimeSpan tsPullCycle)
{
//set up connection
this._factory = new ConnectionFactory();
this._factory.HostName = ipAddress;
this._connection = this._factory.CreateConnection();
this._channel = this._connection.CreateModel();
//set up and bind the exchange
this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>());
string queueName = this._channel.QueueDeclare().QueueName;
this._channel.QueueBind(queueName, exchangeName, "");
//start consuming
QueueingBasicConsumer consumer = new QueueingBasicConsumer(this._channel);
this._channel.BasicConsume(queueName, true, consumer);
//periodically check for new messages from the publisher
this._timer = new Timer(new TimerCallback(this.TimerStep), consumer, tsPullCycle, tsPullCycle);
}
public void Dispose()
{
if (this._timer != null)
this._timer.Dispose();
if (this._channel != null)
{
this._channel.Close();
this._channel.Dispose();
}
if (this._connection != null)
{
this._connection.Close();
this._connection.Dispose();
}
}
}
現在,我對這個解決辦法是,總是有一個消費者窗口打開的地方。不過,理想情況下,我希望我的發佈者無論打開多少個消費者窗口都能運行。謝謝。
編輯糟糕,我粘貼了錯誤的生產者代碼。這裏是:
private SubscriptionBroadcastType(string ipAddress, string exchangeName)
{
this._factory = new ConnectionFactory();
this._factory.HostName = ipAddress;
this._connection = this._factory.CreateConnection();
this._channel = this._connection.CreateModel();
this._exchangeName = exchangeName;
this._channel.ExchangeDeclare(exchangeName, SubscriptionBroadcastType.BROADCAST, SubscriptionBroadcastType.DURABLE, SubscriptionBroadcastType.AUTO_DELETE, new Dictionary<string, object>());
}
public void BroadcastMessage(string message)
{
lock (this._syncroot) //protect _channel
{
if (this._channel.IsOpen)
this._channel.BasicPublish(this._exchangeName, "", null, System.Text.Encoding.UTF8.GetBytes(message));
}
}
你說得對,我粘貼了錯誤的生產者代碼。謝謝你指出。我編輯了我的原始帖子來解決這個問題。 – user1214135