我正在研究Spring應用程序,它將每分鐘收到大約500個xml郵件。下面的xml配置只允許每分鐘處理大約60條消息,其餘的消息存儲在隊列中(保存在數據庫中),並以每分鐘60條消息的速度檢索。春季集成:如何增加傳入郵件的處理
嘗試從多個來源閱讀文檔,但仍然不清楚Poller與任務執行程序結合的作用。我理解爲什麼當前每分鐘處理60條消息是因爲輪詢器配置中的「固定延遲」值設置爲10(因此它將在1分鐘內輪詢6次)以及「最大每輪詢消息」設置爲10,因此每分鐘處理6x10 = 60條消息。
請指教我的理解是否正確,並有助於修改xml配置以實現更高速率處理傳入消息。
任務執行程序的角色也不清楚 - 這是否意味着pool-size =「50」將允許50個線程並行運行以處理輪詢器所輪詢的消息?
我想在整個的是:
- JdbcChannelMessageStore用於傳入XML消息存儲在數據庫中(INT_CHANNEL_MESSAGE)表。這是必需的,所以在服務器重啓的情況下,消息仍然存儲在表中而不會丟失。
- 傳入的消息並行執行,但數量控制/限制。根據系統處理這些消息的能力,我想限制系統應該並行處理多少個消息。
- 由於此配置將用於羣集中的多個服務器,因此任何服務器都可以接收任何消息,因此它不應導致兩臺服務器處理同一消息的任何衝突。希望這是由Spring Integration來處理的。
道歉,如果這已被回答其他地方,但在看過無數帖子後,我仍不明白這是如何工作的。
在此先感謝。
<!-- Message Store configuration start -->
<!-- JDBC message store configuration -->
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />
<int:transaction-synchronization-factory
id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" />
<int:poller id="messageStorePoller" fixed-delay="10"
receive-timeout="500" max-messages-per-poll="10" task-executor="pool"
default="true" time-unit="SECONDS">
<int:transactional propagation="REQUIRED"
synchronization-factory="syncFactory" isolation="READ_COMMITTED"
transaction-manager="transactionManager" />
</int:poller>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- 1) Store the message in persistent message store -->
<int:channel id="incomingXmlProcessingChannel">
<int:queue message-store= "store" />
</int:channel>
<!-- 2) Check in, Enrich the headers, Check out -->
<!-- (This is the entry point for WebService requests) -->
<int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel">
<int:claim-check-in message-store="simpleMessageStore" />
<int:header-enricher >
<int:header name="CLAIM_CHECK_ID" expression="payload"/>
<int:header name="MESSAGE_ID" expression="headers.id" />
<int:header name="IMPORT_ID" value="XML_IMPORT"/>
</int:header-enricher>
<int:claim-check-out message-store="simpleMessageStore" />
</int:chain>
增加從阿爾喬姆響應後:
由於阿爾喬姆。因此,在固定延遲10秒後發生的每次輪詢(按照上面的配置),任務執行程序將檢查任務隊列,並且如果可能(並且需要)啓動新任務?並且每個pollingTask(線程)將根據消息存儲(隊列)中的「maxMessagesPerPoll」配置接收「10」消息。
爲了實現更高的傳入消息處理時間,我應該減少輪詢器上的fixedDelay,以便任務執行程序可以啓動更多的線程嗎?如果我將fixedDelay設置爲2秒,則將啓動一個新線程來執行10條消息,並且大約30個這樣的線程將在一分鐘內啓動,在一分鐘內處理「大致」300條傳入消息。
對不起,在一個問題中要求太多 - 只是想解釋完整的問題。
謝謝@Artem,在您的回覆後,我在上面的問題中添加了一個問題。在查看AbstractPollingEndpoint類後,它肯定會增加我的理解,但不確定是否正確理解了模式。 –
是的,你的理解是正確的。你也可以考慮爲你的任務執行器使用'CallersRunPolicy',所以當池中沒有線程時,調度器將執行輪詢週期。但同時,在該線程空閒之前不會再有新的輪詢週期啓動。 –