2017-08-23 65 views
1

我正在研究Spring應用程序,它將每分鐘收到大約500個xml郵件。下面的xml配置只允許每分鐘處理大約60條消息,其餘的消息存儲在隊列中(保存在數據庫中),並以每分鐘60條消息的速度檢索。春季集成:如何增加傳入郵件的處理

嘗試從多個來源閱讀文檔,但仍然不清楚Poller與任務執行程序結合的作用。我理解爲什麼當前每分鐘處理60條消息是因爲輪詢器配置中的「固定延遲」值設置爲10(因此它將在1分鐘內輪詢6次)以及「最大每輪詢消息」設置爲10,因此每分鐘處理6x10 = 60條消息。

請指教我的理解是否正確,並有助於修改xml配置以實現更高速率處理傳入消息。

任務執行程序的角色也不清楚 - 這是否意味着pool-size =「50」將允許50個線程並行運行以處理輪詢器所輪詢的消息?

我想在整個的是:

  1. JdbcChannelMessageStore用於傳入XML消息存儲在數據庫中(INT_CHANNEL_MESSAGE)表。這是必需的,所以在服務器重啓的情況下,消息仍然存儲在表中而不會丟失。
  2. 傳入的消息並行執行,但數量控制/限制。根據系統處理這些消息的能力,我想限制系統應該並行處理多少個消息。
  3. 由於此配置將用於羣集中的多個服務器,因此任何服務器都可以接收任何消息,因此它不應導致兩臺服務器處理同一消息的任何衝突。希望這是由Spring Integration來處理的。

道歉,如果這已被回答其他地方,但在看過無數帖子後,我仍不明白這是如何工作的。

在此先感謝。

<!-- Message Store configuration start -->    

    <!-- JDBC message store configuration --> 
    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> 
     <property name="dataSource" ref="dataSource"/> 
     <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> 
     <property name="region" value="TX_TIMEOUT"/> 
     <property name="usingIdCache" value="true"/> 
    </bean> 

    <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />   

<int:transaction-synchronization-factory 
    id="syncFactory"> 
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> 
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" /> 
</int:transaction-synchronization-factory> 

<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" /> 

<int:poller id="messageStorePoller" fixed-delay="10" 
    receive-timeout="500" max-messages-per-poll="10" task-executor="pool" 
    default="true" time-unit="SECONDS"> 
    <int:transactional propagation="REQUIRED" 
     synchronization-factory="syncFactory" isolation="READ_COMMITTED" 
     transaction-manager="transactionManager" /> 
</int:poller> 

<bean id="transactionManager" 
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 
<!-- 1)  Store the message in persistent message store --> 
    <int:channel id="incomingXmlProcessingChannel"> 
     <int:queue message-store= "store" /> 
    </int:channel> 

    <!-- 2) Check in, Enrich the headers, Check out --> 
    <!-- (This is the entry point for WebService requests) --> 
    <int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel"> 
     <int:claim-check-in message-store="simpleMessageStore" /> 
     <int:header-enricher > 
      <int:header name="CLAIM_CHECK_ID" expression="payload"/> 
      <int:header name="MESSAGE_ID" expression="headers.id" /> 
      <int:header name="IMPORT_ID" value="XML_IMPORT"/> 
     </int:header-enricher> 
     <int:claim-check-out message-store="simpleMessageStore" />   
    </int:chain> 

增加從阿爾喬姆響應後:

由於阿爾喬姆。因此,在固定延遲10秒後發生的每次輪詢(按照上面的配置),任務執行程序將檢查任務隊列,並且如果可能(並且需要)啓動新任務?並且每個pollingTask(線程)將根據消息存儲(隊列)中的「maxMessagesPerPoll」配置接收「10」消息。

爲了實現更高的傳入消息處理時間,我應該減少輪詢器上的fixedDelay,以便任務執行程序可以啓動更多的線程嗎?如果我將fixedDelay設置爲2秒,則將啓動一個新線程來執行10條消息,並且大約30個這樣的線程將在一分鐘內啓動,在一分鐘內處理「大致」300條傳入消息。

對不起,在一個問題中要求太多 - 只是想解釋完整的問題。

回答

0

主要的邏輯是這樣的類背後:

private final class Poller implements Runnable { 

    private final Callable<Boolean> pollingTask; 

    Poller(Callable<Boolean> pollingTask) { 
     this.pollingTask = pollingTask; 
    } 

    @Override 
    public void run() { 
     AbstractPollingEndpoint.this.taskExecutor.execute(() -> { 
      int count = 0; 
      while (AbstractPollingEndpoint.this.initialized 
        && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0 
        || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) { 
       try { 
        if (!Poller.this.pollingTask.call()) { 
         break; 
        } 
        count++; 
       } 
       catch (Exception e) { 
        if (e instanceof MessagingException) { 
         throw (MessagingException) e; 
        } 
        else { 
         Message<?> failedMessage = null; 
         if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) { 
          Object resource = TransactionSynchronizationManager.getResource(getResourceToBind()); 
          if (resource instanceof IntegrationResourceHolder) { 
           failedMessage = ((IntegrationResourceHolder) resource).getMessage(); 
          } 
         } 
         throw new MessagingException(failedMessage, e); 
        } 
       } 
       finally { 
        if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) { 
         Object resource = getResourceToBind(); 
         if (TransactionSynchronizationManager.hasResource(resource)) { 
          TransactionSynchronizationManager.unbindResource(resource); 
         } 
        } 
       } 
      } 
     }); 
    } 

} 

正如你所看到的taskExecutor負責旋pollingTask直到maxMessagesPerPoll在一個線程。如果當前輪詢任務對於新計劃太長,則池中的其他線程將會涉及。但是一次輪詢中的所有消息都在同一個線程中處理,而不是並行處理。

這就是它的工作原理。既然你在一個SO問題中要求太多,我希望這些信息足以找出下一步的步驟。

+0

謝謝@Artem,在您的回覆後,我在上面的問題中添加了一個問題。在查看AbstractPollingEndpoint類後,它肯定會增加我的理解,但不確定是否正確理解了模式。 –

+0

是的,你的理解是正確的。你也可以考慮爲你的任務執行器使用'CallersRunPolicy',所以當池中沒有線程時,調度器將執行輪詢週期。但同時,在該線程空閒之前不會再有新的輪詢週期啓動。 –