2016-05-31 44 views
1

我一直在玩RabbitMQ和它的.NET連接器一段時間。 經過一段時間的測試後,我使用RabbitMQ作爲Web應用程序和API網絡中的經紀人的新系統達到了產量。 所有的應用程序都在幾分鐘內卡住了。 我想我打了一些依賴於操作系統的閾值或者搞砸了一些TCP堆棧(在我的客戶端的TCP/IP連接甚至沒有再打到網絡服務器之後)。RabbitMQ,.NET和多線程

我還沒有找到關於如何處理通過數十個進程和數千個線程的激烈流量傳播的好材料。

我有一個系統,每秒生成10k +線程,每個人都必須通過連接刪除消息,然後終止。 對於所有這些消息我只是一些'捕手'。

已經採取了一些對策。 不用於每個新線程的新連接 - >聲明一個連接工廠和使用靜態連接(連接上和渠道功能已經說是線程安全的) - >解決

這裏的工廠

public static class Factory 
    { 
     private static IConnection sharedConnection_cl; 
     public static IConnection SharedConnection_cl 
     { 
      get 
      { 
       if (sharedConnection_cl == null) 
       { 
        sharedConnection_cl = GetRabbitMqConnection(); 
       } 
       return sharedConnection_cl; 
     } 

    private static IConnection GetRabbitMqConnection() 
      { 
       ConnectionFactory connectionFactory = new    ConnectionFactory(); 
       connectionFactory.HostName = "x.y.z.w"; 
       connectionFactory.UserName = "notTheGuestUser"; 
       connectionFactory.Password = "abcdef"; 
       connectionFactory.VirtualHost = "/dedicatedHost"; 
       return connectionFactory.CreateConnection(); 
      } 

沒有代碼來使用所有可用的Erlang進程。 Erlang進程閾值在不到10分鐘的時間內達到(同一連接上的封閉信道不會觸發服務器上相應Erlang進程的死亡).-> 在任何給定連接的最大信道計數上添加閾值並保護使用信號量進行訪問。每過一段時間的連接被關閉並重新創建(當引黃關閉相應的Erlang進程終止) - >解決

下面是管理渠道門檻

public static class Factory 
{ 
    private static IConnection sharedConnection_cl; 
    private static int channelCount_int { get; set; } 
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1); 
    public static IConnection SharedConnection_cl 
    { 
     get 
     { 
      if (sharedConnection_cl == null) 
      { 
       sharedConnection_cl = GetRabbitMqConnection(); 
       channelCount_int = 0; 
      } 
      connectionAccessSemaphore_sem.WaitOne(); 
      if (channelCount_int > 10000) 
      { 

       sharedConnection_cl.Close(); 
       sharedConnection_cl = GetRabbitMqConnection(); 
       channelCount_int = 0; 
      } 
      else 
      { 
       channelCount_int++; 
      } 
      connectionAccessSemaphore_sem.Release(); 
      return sharedConnection_cl; 
     } 
    } 

現在的代碼.. 。這隻會增加操作系統標準閾值(這隻會將不可避免的數據塊的痛苦從幾分鐘延長到幾個小時)...

是否有任何良好的做法來管理連接和通道,以避免操作系統門檻的出現以及satur的出現趨勢?

感謝您的支持。

+0

「我對這些消息只有幾個'捕手'。」你的意思是訂戶嗎? –

+0

這可能有助於https://www.rabbitmq.com/memory.html,https://www.rabbitmq.com/persistence-conf.html –

+0

看看這個微服務架構,也許它會幫助你。 https://msdn.microsoft。com/en-us/magazine/mt595752.aspx – Jaider

回答

1

好的,解決方案已經在那裏。只是向上移動信號量就是訣竅。我沒有考慮到在系統重啓時,當所有appPools結束時,我在連接實例分配上遇到併發問題。 - >解決

public static class Factory{ 
private static IConnection sharedConnection_cl; 
private static int channelCount_int { get; set; } 
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1); 
public static IConnection SharedConnection_cl 
{ 
    get 
    { 
     connectionAccessSemaphore_sem.WaitOne(); 

     if (sharedConnection_cl == null) 
     { 
      sharedConnection_cl = GetRabbitMqConnection(); 
      channelCount_int = 0; 
     } 

     if (channelCount_int > 10000) 
     { 

      sharedConnection_cl.Close(); 
      sharedConnection_cl = GetRabbitMqConnection(); 
      channelCount_int = 0; 
     } 
     else 
     { 
      channelCount_int++; 
     } 
     connectionAccessSemaphore_sem.Release(); 
     return sharedConnection_cl; 
    } 
} 

不知道爲什麼AppPools的鎖鎖定的服務器上的所有TCP連接。