我觀察奇怪的行爲,而這樣做的RabbitMQ 3.3.5的POC使用彈簧兔1.3.9.RELEASE庫併發生產者塊無限期RabbitMQ的
當我開始一個單一的生產線,事情順利進行。但是,如果同時啓動多個線程,只有其中一個線程完成,所有其他線程無限期地被阻塞,即使在隊列變空之後。
從rabbitmqctl list_connections
進行監視時,被阻止的線程連接的狀態仍保持運行狀態。應該注意的是,當生產者阻止時,或者在整個運行期間的任何其他時間,沒有警報。
我也觀察到,如果我在每次發送後放置1毫秒的睡眠,問題就會消失。
所以,我有這些問題
- 難道不是的RabbitMQ在高利率支持併發生產者,出版?
- 即使連接確實被阻塞,爲什麼它不顯示在rabbitmqctl list_connections中?
- 爲什麼他們無限期地阻塞而不恢復乳清隊列變空了?
代碼
public static void main(String[] argv) throws java.io.IOException, InterruptedException {
init();
PocConfig config = new PocConfig();
int threadCount = config.getThreadCount();
final int eventsPerThread = config.getEvents()/threadCount;
final long sleep = config.getSleep();
System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
+ sleep + "]");
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executorService.submit(new Runnable() {
public void run() {
produce(eventsPerThread, sleep, threadId);
}
});
}
waitAndTearDown(executorService);
}
private static void produce(int events, long sleep, int threadId) {
long start = System.currentTimeMillis();
for (int index = 1; index <= events; index++) {
try {
byte[] message = messageFactory.createTestMessage(index);
amqpTemplate.convertAndSend(QUEUE_NAME, message);
if (sleep > 0) {
Thread.sleep(sleep);
}
} catch (Exception e) {
LOG.error("Error", e);
}
}
long time = System.currentTimeMillis() - start;
System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time/1000 + ", tps: " + (events * 1000)/time);
}
Spring配置
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="${addresses}" />
<property name="username" value="${user}" />
<property name="password" value="${passwd}" />
<property name="cacheMode" value="CONNECTION" />
<property name="connectionCacheSize" value="${threads}" />
<property name="channelCacheSize" value="10" />
</bean>
<rabbit:template id="template" connection-factory="connectionFactory"
exchange="testExchange" routing-key="testQueue"/>
我正在測試一百萬個事件,無論如何,這裏是線程轉儲的關鍵部分http://pastebin.com/0B035D0j我知道他們被卡住了,因爲他們不打印完成消息,以及消息計數在一個線程完成後,兔子不會增加。這裏的示例日誌http://pastebin.com/37rk8bYp –
加里,你在本地主機上用兔子測試?在羣集中的另一臺計算機上執行rabbitmq命令時,我只看到了這一點,而不是在本地主機上。關於流量模式,是的,我看到一個連接進入流量模式,然後恢復,但所有其他連接始終保持運行。 –
加里,問題似乎與ConnectionFactory,因爲事情運行良好,如果我啓動5個獨立的進程與每個線程,或者如果我有一個單獨的進程中的每個線程單獨的連接工廠。 –