2012-08-03 49 views
1

我期待以增加我寫對ActiveMQ的高通量生產商的表現,並根據this useAsyncSend會:異步.NET中發送的ActiveMQ

部隊使用異步的發送這增加了巨大的性能提升; ,但意味着無論是否發送了 消息,send()方法都會立即返回,這可能會導致消息丟失。

但是我看不出它對我的簡單測試用例有什麼影響。

使用這個非常基本的應用:

const string QueueName = "...."; 
const string Uri = "...."; 

static readonly Stopwatch TotalRuntime = new Stopwatch(); 

static void Main(string[] args) 
{ 
    TotalRuntime.Start(); 
    SendMessage(); 
    Console.ReadLine(); 
} 

static void SendMessage() 
{ 
    var session = CreateSession(); 
    var destination = session.GetQueue(QueueName); 
    var producer = session.CreateProducer(destination); 

    Console.WriteLine("Ready to send 700 messages"); 
    Console.ReadLine(); 

    var body = new byte[600*1024]; 

    Parallel.For(0, 700, i => SendMessage(producer, i, body, session));   
} 

static void SendMessage(IMessageProducer producer, int i, byte[] body, ISession session) 
{ 
    var message = session.CreateBytesMessage(body); 

    var sw = new Stopwatch(); 
    sw.Start(); 
    producer.Send(message); 
    sw.Stop(); 

    Console.WriteLine("Running for {0}ms: Sent message {1} blocked for {2}ms", 
      TotalRuntime.ElapsedMilliseconds, 
      i, 
      sw.ElapsedMilliseconds); 
}  

static ISession CreateSession() 
{ 
    var connectionFactory = new ConnectionFactory(Uri) 
            { 
             AsyncSend = true, 
             CopyMessageOnSend = false 
            }; 
    var connection = connectionFactory.CreateConnection(); 
    connection.Start(); 
    var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); 
    return session; 
} 

我得到以下輸出:

Ready to send 700 messages 

Running for 2430ms: Sent message 696 blocked for 12ms 
Running for 4275ms: Sent message 348 blocked for 1858ms 
Running for 5106ms: Sent message 609 blocked for 2689ms 
Running for 5924ms: Sent message 1 blocked for 2535ms 
Running for 6749ms: Sent message 88 blocked for 1860ms 
Running for 7537ms: Sent message 610 blocked for 2429ms 
Running for 8340ms: Sent message 175 blocked for 2451ms 
Running for 9163ms: Sent message 89 blocked for 2413ms 
..... 

這表明,每個消息需要800ms的發送和調用session.Send()塊約兩個半秒鐘。儘管文檔中說,

「send()方法會立即返回」

此外,這些數字基本上是相同的,如果我要麼改變並行進行的正常循環或改變AsyncSend = trueAlwaysSyncSend = true所以我不認爲異步開關工作在所有...

任何人都可以看到我在這裏失蹤,使發送異步嗎?


進一步測試後:

根據該絕大部分運行時間被花在等待同步蟻性能分析器。看起來問題在於各種傳輸類通過監視器在內部阻塞。特別是我似乎掛斷了MutexTransport的OneWay方法,它只允許一個線程一次訪問它。

看起來好像調用send將阻塞,直到先前的消息已完成,這就解釋了爲什麼我的輸出顯示,封鎖12ms的第一條消息,而未來了1858ms。我可以通過實現每個連接消息模式來實現多個傳輸,這樣可以改善問題並使消息並行發送,但是大大增加了發送單個消息的時間,並且消耗了太多似乎不太像的資源正確的解決方案。

我重新測試所有這一切與1.5.6並沒有看到任何區別。

+0

這裏缺少一些關鍵信息,NMS.ActiveMQ的哪些版本?您傳遞給經紀人的實際URI是什麼?經紀人是否啓用了生產者流量控制? – 2012-08-03 18:30:18

+0

@Tim好點,對不起。我運行的是版本1.5.1,URI是內部服務器,但形式爲「tcp:// ...」,我不知道代理是如何設置的 - 我可以訪問控制檯網頁,但似乎並未公開該信息。進一步的調查使我相信這個問題與我發送的消息的大小(600Kb)有關,但我仍在研究它,所以任何幫助都將不勝感激。 – 2012-08-03 18:37:41

回答

0

一如既往做的最好的事情就是更新到最新版本(1.5.6在寫這篇文章的時間)。如果代理啓用了生產者流量控制,並且您已達到隊列大小限制,則發送可以阻止,儘管使用異步發送不應該發生,除非您使用producerWindowSize集發送消息。獲得幫助的一個好方法是創建一個測試用例,並通過Jira問題將其提交給NMS.ActiveMQ網站,以便我們可以使用您的測試代碼查看它。從1.5.1開始有很多修正,所以我建議嘗試一下這個新版本,因爲它可能已經不是問題了。