2015-05-30 21 views
0

我觀察奇怪的行爲,而這樣做的RabbitMQ 3.3.5的POC使用彈簧兔1.3.9.RELEASE庫併發生產者塊無限期RabbitMQ的

當我開始一個單一的生產線,事情順利進行。但是,如果同時啓動多個線程,只有其中一個線程完成,所有其他線程無限期地被阻塞,即使在隊列變空之後。

rabbitmqctl list_connections進行監視時,被阻止的線程連接的狀態仍保持運行狀態。應該注意的是,當生產者阻止時,或者在整個運行期間的任何其他時間,沒有警報。

我也觀察到,如果我在每次發送後放置1毫秒的睡眠,問題就會消失。

所以,我有這些問題

  1. 難道不是的RabbitMQ在高利率支持併發生產者,出版?
  2. 即使連接確實被阻塞,爲什麼它不顯示在rabbitmqctl list_connections中?
  3. 爲什麼他們無限期地阻塞而不恢復乳清隊列變空了?

代碼

public static void main(String[] argv) throws java.io.IOException, InterruptedException { 
     init(); 
     PocConfig config = new PocConfig(); 
     int threadCount = config.getThreadCount(); 
     final int eventsPerThread = config.getEvents()/threadCount; 
     final long sleep = config.getSleep(); 

     System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep=" 
      + sleep + "]"); 

     ExecutorService executorService = Executors.newFixedThreadPool(threadCount); 
     for (int i = 0; i < threadCount; i++) { 
      final int threadId = i; 
      executorService.submit(new Runnable() { 
       public void run() { 
        produce(eventsPerThread, sleep, threadId); 
       } 
      }); 
     } 
     waitAndTearDown(executorService); 
    } 

    private static void produce(int events, long sleep, int threadId)  { 
     long start = System.currentTimeMillis(); 
     for (int index = 1; index <= events; index++) { 
      try { 
       byte[] message = messageFactory.createTestMessage(index); 
       amqpTemplate.convertAndSend(QUEUE_NAME, message); 
       if (sleep > 0) { 
        Thread.sleep(sleep); 
       } 
      } catch (Exception e) { 
       LOG.error("Error", e); 
      } 
     } 
     long time = System.currentTimeMillis() - start; 
     System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time/1000 + ", tps: " + (events * 1000)/time); 
    } 

Spring配置

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> 
    <property name="addresses" value="${addresses}" /> 
    <property name="username" value="${user}" /> 
    <property name="password" value="${passwd}" /> 
    <property name="cacheMode" value="CONNECTION" /> 
    <property name="connectionCacheSize" value="${threads}" /> 
    <property name="channelCacheSize" value="10" /> 
</bean> 

<rabbit:template id="template" connection-factory="connectionFactory" 
    exchange="testExchange" routing-key="testQueue"/> 

回答

1

沒有什麼我能想到的,會阻止的,所以我只是跑測試;並沒有問題:

Start producer with configuration [threadCount=5, events=10, sleep=0] 
Producer:2 finished, events: 1000, Time(s): 0, tps: 4405 
Producer:3 finished, events: 1000, Time(s): 0, tps: 4132 
Producer:1 finished, events: 1000, Time(s): 0, tps: 4048 
Producer:0 finished, events: 1000, Time(s): 0, tps: 3968 
Producer:4 finished, events: 1000, Time(s): 0, tps: 3952 

是什麼讓你覺得他們被阻止?

進行線程轉儲(例如使用jstack)來查看線程在做什麼。

編輯

我仍然不能複製,甚至用1M的消息和CacheMode CONNECTION ...

Start producer with configuration [threadCount=5, events=200000, sleep=0] 
Producer:0 finished, events: 200000, Time(s): 50, tps: 3959 
Producer:3 finished, events: 200000, Time(s): 53, tps: 3746 
Producer:1 finished, events: 200000, Time(s): 55, tps: 3635 
Producer:2 finished, events: 200000, Time(s): 55, tps: 3634 
Producer:4 finished, events: 200000, Time(s): 55, tps: 3629 

看到排隊進入flow模式(通過管理用戶界面),但一切恢復得很好。

我看到你的工人是流量控制下......

"pool-2-thread-3" prio=10 tid=0x00007f4af4849800 nid=0x65d5 runnable [0x00007f4ae082f000] 
java.lang.Thread.State: RUNNABLE 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 

你看到兔子什麼日誌?您在管理界面上看到關於電子郵件費率,狀態等的信息?不管怎樣,這似乎與Spring AMQP沒有任何關係;您需要聯繫rabbitmq-users谷歌組的rabbitmq球員。

(我正在用rabbitmq 3.4.2進行測試)。

EDIT2:

是絕對乾淨安裝的3.5.2 ...

Start producer with configuration [threadCount=5, events=200000, sleep=0] 
Producer:0 finished, events: 200000, Time(s): 39, tps: 5091 
Producer:1 finished, events: 200000, Time(s): 39, tps: 5002 
Producer:2 finished, events: 200000, Time(s): 40, tps: 4954 
Producer:3 finished, events: 200000, Time(s): 40, tps: 4951 
Producer:4 finished, events: 200000, Time(s): 40, tps: 4939 

,我看到在管理界面沒有flow狀態(在隊列中,但渠道/連接顯示他們正在流動,但又恢復)。

+0

我正在測試一百萬個事件,無論如何,這裏是線程轉儲的關鍵部分http://pastebin.com/0B035D0j我知道他們被卡住了,因爲他們不打印完成消息,以及消息計數在一個線程完成後,兔子不會增加。這裏的示例日誌http://pastebin.com/37rk8bYp –

+0

加里,你在本地主機上用兔子測試?在羣集中的另一臺計算機上執行rabbitmq命令時,我只看到了這一點,而不是在本地主機上。關於流量模式,是的,我看到一個連接進入流量模式,然後恢復,但所有其他連接始終保持運行。 –

+0

加里,問題似乎與ConnectionFactory,因爲事情運行良好,如果我啓動5個獨立的進程與每個線程,或者如果我有一個單獨的進程中的每個線程單獨的連接工廠。 –

相關問題