2016-12-14 96 views
2

立足我的建築離這個例子:通道在RabbitMQ的空轉與工作隊列情景

RabbitMQ - Work Queues

設置:

  • 工人同時
  • 得到一個消息,每個勞工下載量文檔,需要幾秒鐘
  • 工人成功下載文檔後,它會確認郵件
  • 如果工人不能下載一個文件,它爲noAcks重新排隊(三級重試的最大值)

我期待到我的實現的瓶頸是造成減速的消息。因爲我使用noAck來重新排隊失敗的工人。要啓用此均勻地分佈在我的工作線程我已經設置預取1.Looking在這個問題:RabbitMQ work queue is blocking consumers - 他們看到我所看到的在下面的截圖:

acks/second

channels

要使確保工人一次只能分配一條消息,我需要將預取設置爲1,但也有人說這會導致工人順序工作而不是並行工作。

跑步在渠道層面究竟意味着什麼?我看到隊列和連接正常運行,但單個通道(每個線程一個)空閒。

編輯#1:關於將連接池傳遞給RabbitMQ連接的說明看起來很有希望。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool我使用Spring AMQP,但我認爲類似的方法可以用在這裏:

 /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     //configure a large thread pool for the connection thread 
     int threads = 30; 
     logger.info(String.format("Configuring thread pool with %d threads", threads)); 
     ExecutorService connectionPool = Executors.newFixedThreadPool(threads); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     return cf; 
    } 

    @Bean 
    AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){ 
     AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     return rabbitTemplate; 
    } 

回答

1

Apparantly春季AMQP單個物理TCP/IP連接的默認線程池默認爲5個線程:

Spring AMQP

此外,在寫入時,所述的RabbitMQ-客戶端庫創建默認每個連接(5個螺紋)固定線程池。在使用大量連接時,應考慮在CachingConnectionFactory上設置自定義執行程序。然後,所有連接將使用相同的執行程序,並且它的線程可以共享。執行程序的線程池應該是無界的,或者爲預期的利用率設置適當的值(通常每個連接至少有一個線程)。如果在每個連接上創建多個通道,則池大小將影響併發性,因此變量(或簡單緩存)的線程池執行程序將是最合適的。

我能夠通過改變分配給的RabbitMQ的連接池的線程數來複制此:

/** 
    * Expand the number of concurrent threads for a single RabbitMQ connection 
    * http://docs.spring.io/spring-amqp/reference/htmlsingle/ 
    * Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. 
    * When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. 
    */ 
    @Bean(name="channelPool") 
    @Scope("singleton") 
    MigrationPool rabbitConnectionPool(){ 
     int channels = 50; 
     logger.info(String.format("Configuring connection pool with %d threads", channels)); 
     return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS, 
       new LinkedBlockingQueue<Runnable>()); 
    } 

    /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize())); 
     return cf; 
    } 

在上面的代碼我有每個連接線的鏡像虛擬信道即數數每個虛擬RabbitMQ通道有一個真實的物理線程,因爲每個通道都引用一個工作線程,每個線程處理一條消息。這將導致渠道不再阻塞默認的5個連接,而不是採取線程的數量擴張充分利用:

channels no longer blocking

操縱可用的RabbitMQ的連接將顯示通道阻塞的線程數。例如,設置爲10個線程並打開50個通道。