2017-08-01 42 views
1

我已經創建了一個具有20個內核的指定線程池的連接。只有一個線程在執行器服務中同時運行,RabbitMQ

 ConnectionFactory factory = new ConnectionFactory(); 
     .... 
     //specified es 
     ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory); 
     con = factory.newConnection(consumerExecutor, addresses); 

然後從該連接建立頻道:

 final Channel channel = connection.createChannel(); 

並使用它來創建DefaultConsumer。

雖然我發現儘管線程可以用來消費消息,但總是隻有一個線程正在消費消息,即使消息在服務器中大量累積。

我看看源代碼,並發現:

private final class WorkPoolRunnable implements Runnable { 

    @Override 
    public void run() { 
     int size = MAX_RUNNABLE_BLOCK_SIZE; 
     List<Runnable> block = new ArrayList<Runnable>(size); 
     try { 
      Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
      if (key == null) return; // nothing ready to run 
      try { 
       for (Runnable runnable : block) { 
        runnable.run(); 
       } 
      } finally { 
       if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
        ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
       } 
      } 
     } catch (RuntimeException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 


/* Basic work selector and state transition step */ 
private K readyToInProgress() { 
    K key = this.ready.poll(); 
    if (key != null) { 
     this.inProgress.add(key); 
    } 
    return key; 
} 


/** 
* Return the next <i>ready</i> client, 
* and transfer a collection of that client's items to process. 
* Mark client <i>in progress</i>. 
* If there is no <i>ready</i> client, return <code><b>null</b></code>. 
* @param to collection object in which to transfer items 
* @param size max number of items to transfer 
* @return key of client to whom items belong, or <code><b>null</b></code> if there is none. 
*/ 
public K nextWorkBlock(Collection<W> to, int size) { 
    synchronized (this) { 
     K nextKey = readyToInProgress(); 
     if (nextKey != null) { 
      VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey); 
      drainTo(queue, to, size); 
     } 
     return nextKey; 
    } 
} 

訣竅應該是ConsumerWorkService.this.workPool.nextWorkBlock,它輪詢從就緒隊列的通道,並運行回調後添加到終點塊讀隊列run()。如果我錯了,請糾正我。

這是令人困惑的,因爲消費者綁定到一個通道,並且直到最後一個任務塊完成時纔將通道釋放到隊列中,這意味着線程池始終只爲該消費者提供一個線程。

問題:

  1. 爲什麼RabbitMQ的設計,此模型
  2. 我們如何優化這一問題
  3. 是好到任務提交給一個獨立的線程池handleDelivery消費消息以及ACK (確保消息ACK任務完成後,才)

回答

0

> 1.爲什麼RabbitMQ的設計這個模型

我想知道我自己的原因。但是這個事實清楚地體現在它們的documentation

每個通道都有它自己的調度線程。對於每個頻道一個消費者最常見的使用案例 ,這意味着消費者不會阻止其他消費者購買其他 消費者。如果每個頻道有多個消費者,請注意長時間運行的消費者可能會阻止向該頻道上的其他 消費者發送回叫。

> 2.我們怎樣優化這個問題

您可以有多個頻道或處理提交實際工作到另一個線程池脫鉤消息消費。你可以在this article找到更多的細節。

> 3。這是好到任務提交到handleDelivery一個獨立的線程池消耗消息以及ACK(確保消息ACK任務完成後,才)

報價從docs

當手動確認是使用,重要的是考慮 什麼線程確認。如果是從接收交付 線程不同(例如消費者#handleDelivery 委託交付辦案,以不同的線程),與 承認設置爲true多個參數是不安全的,並會導致 雙重確認,因此通道級別協議 關閉通道的異常。確認一條消息在 時間可能是安全的。

相關問題