2012-06-04 35 views
2

我對某些pub/sub(發佈者/訂閱者)代碼使用.Net RabbitMQ。一切正常,直到我開始關閉消費者。消費者正確處理已發佈的數據,直到關閉最後一位消費者。畢竟所有的消費者,我開了一個新的消費者,但沒有任何反應。應用程序將打開,但它不會從發佈者處收到任何數據。RabbitMQ Pub/Sub:關閉最後一位消費者關閉發行商的頻道/型號。爲什麼?

我檢查了發佈者代碼,發現當最後一位消費者關閉時,其頻道的IsOpen屬性變爲false。即使消費者關閉後,我也不知道是否有一些設置可以保持渠道暢通。

這是我的發佈者代碼: 編輯我原來粘貼了錯誤的代碼。

這裏是我的消費者代碼:

public MyConsumer 
{ 
private readonly ConnectionFactory _factory; 
private readonly IConnection _connection; 
private readonly IModel _channel; 
private readonly Timer _timer; 

private SubscriptionConsumerType(string ipAddress, string exchangeName, TimeSpan tsPullCycle) 
{ 
    //set up connection 
    this._factory = new ConnectionFactory(); 
    this._factory.HostName = ipAddress; 
    this._connection = this._factory.CreateConnection(); 
    this._channel = this._connection.CreateModel(); 

    //set up and bind the exchange 
    this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>()); 
    string queueName = this._channel.QueueDeclare().QueueName; 
    this._channel.QueueBind(queueName, exchangeName, ""); 

    //start consuming 
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(this._channel); 
    this._channel.BasicConsume(queueName, true, consumer); 

    //periodically check for new messages from the publisher 
    this._timer = new Timer(new TimerCallback(this.TimerStep), consumer, tsPullCycle, tsPullCycle); 
} 

public void Dispose() 
{ 
    if (this._timer != null) 
     this._timer.Dispose(); 

    if (this._channel != null) 
    { 
     this._channel.Close(); 
     this._channel.Dispose(); 
    } 

    if (this._connection != null) 
    { 
     this._connection.Close(); 
     this._connection.Dispose(); 
    } 
} 
} 

現在,我對這個解決辦法是,總是有一個消費者窗口打開的地方。不過,理想情況下,我希望我的發佈者無論打開多少個消費者窗口都能運行。謝謝。

編輯糟糕,我粘貼了錯誤的生產者代碼。這裏是:

private SubscriptionBroadcastType(string ipAddress, string exchangeName) 
{ 
    this._factory = new ConnectionFactory(); 
    this._factory.HostName = ipAddress; 
    this._connection = this._factory.CreateConnection(); 
    this._channel = this._connection.CreateModel(); 

    this._exchangeName = exchangeName; 
    this._channel.ExchangeDeclare(exchangeName, SubscriptionBroadcastType.BROADCAST, SubscriptionBroadcastType.DURABLE, SubscriptionBroadcastType.AUTO_DELETE, new Dictionary<string, object>()); 
} 

public void BroadcastMessage(string message) 
{ 
    lock (this._syncroot) //protect _channel 
    { 
     if (this._channel.IsOpen) 
      this._channel.BasicPublish(this._exchangeName, "", null, System.Text.Encoding.UTF8.GetBytes(message)); 
    } 
} 

回答

2

我想你可能在這裏有一些根本性的錯誤。請檢查您是否發佈了正確的代碼。當我讀到它時,你有一個生產者創建一個特定的命名隊列並直接發佈到隊列中。您有一位消費者創建一個特定的指定交易所,然後創建一個動態命名的新隊列並將其綁定到交易所。然後您將從此隊列中讀取數據,但它不能是您最初發布到的隊列。

我會先修復您的代碼,然後在您的發佈商代碼中添加使用您在消費者代碼中可以訪問的特定名稱創建交換。這條線將出現在生產線,而不是排隊申報行:

this._channel.ExchangeDeclare(exchangeName, "fanout", false, true, new Dictionary<string, object>()); 

然後,你將需要發佈到交換,而不是發佈到隊列變化的線路,爲:

this._channel.BasicPublish(exchangeName, "", this._basicProperties, System.Text.Encoding.UTF8.GetBytes(message)); 

您的消費者應該設置好,以便從隊列中接收這些消息。看看是否有助於解決您的問題。

+0

你說得對,我粘貼了錯誤的生產者代碼。謝謝你指出。我編輯了我的原始帖子來解決這個問題。 – user1214135