2014-03-05 34 views
0

我有一個RabbitMQ SimpleRpcServer,我想知道如何正確處置它,當我完成它。該模式有一個主循環,直到它接收到消息,處理消息,然後再次阻止爲止。這意味着爲了打破循環,我必須發送一個特殊編碼的消息,處理程序可以使用它來擺脫循環。如何正確處理SimpleRpcServer對象?

我讀過RabbitMQ頻道應該從創建它們的同一個線程訪問。如果這是真的(我找不到源代碼),這是否意味着要處理我的SimpleRpcServer,我將不得不創建一個專門用於將關閉消息發送到主循環的新通道?

這裏的僞代碼:

主循環(actual code here):

//I run this on a ThreadPool thread 
foreach (var evt in m_subscription) //blocks until a message appears 
{ 
    ProcessRequest(evt); 
} 

處理程序:

private void ProcessRequest(evt) 
{ 
    if(evt.BasicProperties.ContentType == "close") //close flag 
    { 
     base.Close(); 
    } 
    else 
    { 
     //process message 
    } 
} 

處置代碼(具體創建一個新的信道用於消除主循環):

//I call this from the main thread 
public void Dipose() 
{ 
    var channel = new Channel(); 
    var props = new Properties("close"); //close flag 
    channel.BasicPublish(queueName, props); 
    channel.Dispose(); 
} 

請注意,我遺漏了一些初始化和處理代碼。我知道這段代碼不會編譯。

回答

1

是的,你不應該在兩個或多個線程之間共享通道,請閱讀http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.2.4/rabbitmq-dotnet-client-3.2.4-user-guide.pdf「2.10。 IModel不應線程」

之間共享要關閉您的頻道,你可以創建一個線程使用的信道,則關閉主線程上通道認購,如:

初始化連接,通道等。 。

ConnectionFactory connection_factory = new ConnectionFactory(); 
     IConnection conn = null; 
     IModel channel = null; 
     Subscription sub = null; 
     private void Connect() 
      try 
      { 
       conn = connection_factory.CreateConnection(); 
       channel = conn.CreateModel(); 
       sub = new Subscription(channel, your_queue, true); 
       StartSubscribeThread (sub); 
…. 

創建線程:

public void StartSubscribeThread(Subscription sub) 
{ 
    var t = new Thread(() => InternalStartSubscriber(sub)); 
    t.Start(); 
} 



private void InternalStartSubscriber(Subscription sub) 
{ 
    foreach (BasicDeliverEventArgs e in sub) 
    { 
    //Handle your messages 
    } 
} 

最後:

private void Disconnet() 
sub.Close(); 
channel.Close(); 
channel.Dispose(); 

用這種方法可以避免創建另一個通道來關閉第一個通道