我有一個接收大消息(在RDBMS表中登陸)的流程,所以我無法在給定的時間處理太多的消息。因此,我正在使用<int:poller max-messages-per-poll="" />
節流處理,還有一些queues
與capacity
設置爲<int:queue capacity="">
。我知道多個線程/事務將參與這個流程,並且對於這種用法來說這是可以接受的。在Spring集成中如何最好地實現DynamicPoller
查詢輪詢數據庫需要一些時間來運行,因此我不想運行它比我需要更頻繁。此外,這個流接收到的消息往往會在「突發」內出現,這意味着它可能會得到1000條消息,然後在一個小時內就沒有消息進入。
我想要做的是使用dynamic-poller
,它將不經常輪詢(因爲注意到查詢運行成本高),除非我看到我得到了一連串的消息,在這種情況下我想非常頻繁地進行輪詢直到處理完所有消息。例如,如果我有<int:poller max-messages-per-poll="100" />
,並且我知道輪詢器只讀取了100條消息,那麼很可能在RDBMS中有更多消息需要處理,並且我們應該在處理完成後立即再次輪詢。
我知道春天不提供一種方法來修改trigger
使它在本質上是動態,並已看了看Spring集成參考「7.1.5 Change Polling Rate at Runtime「 並在dynamic-poller
樣本項目:Dynamic Poller
這是一個開始,但我真的需要poller
改變它是一個基於電流負載頻率。
我可能不是這個正確的,但我想也許加里提到這樣的事情會很有趣在他的談話,以落實「Implementing High-Availability Architectures with Spring Integration」。
在任何情況下編寫一個班級來改變poller
頻率似乎都不是什麼大不了的事情。更有挑戰性的是如何知道投票何時發生,因爲沒有任何內容被髮布到輸出頻道,所以沒有產生任何結果。
一些選項,我認爲:
附上
<int:wire-tap channel="" />
到調用<int:service-activator>
的poller
的通道。服務激活器檢查消息的數量並在DynamicPeriodicTrigger
上調整poller
的period
。
問題是,如果沒有收到消息,這將永遠不會被調用,所以一旦我調整更頻繁地輪詢輪詢期將無限期地保持。同#1,但添加邏輯以
DynamicPeriodicTrigger
,將還原period
回initialDelay
下一個觸發之後發生或一定的時間之後。使用
<int:poller>
元素中的<int:advice-chain>
元素與MethodInterceptor
實現。
類似於Artem在此link中建議的內容。 雖然這可以讓我在receive
方法的前面,但它不允許我訪問receive
方法的結果(這會給我檢索到的消息的數量)。請注意,這似乎是由加里提到的link所確認的。請求處理程序通知鏈是一個特例;我們必須注意只提供內部端點方法而不提供任何下游處理(在輸出通道上)。
爲輪詢者提供建議更簡單,因爲我們建議整個流程。如「7.1.4名稱空間支持」小節「AOP建議鏈」小節所述,您只需通過實施MethodInterceptor接口來創建建議。
見
SourcePollingChannelAdapterFactoryBeanTests.testAdviceChain()
一個非常簡單的建議......代碼:
adviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
adviceApplied.set(true);
return invocation.proceed();
}
});
這僅僅是用來斷言的意見是正確地稱呼;真正的建議會在invocation.proceed()之前和/或之後添加代碼。實際上,此建議建議所有方法,但只有一個,(
Callable.call()
)。跟一個切入點,看起來爲
Message<T> receive()
方法創建AfterReturning
建議。克隆
JdbcPollingChannelAdapter
並添加我的鉤子在這個新類。也許Gary建議在這個link將是有用的,但「主旨」鏈接不再有效。
更新:
我最終實現的選擇是使用AfterReturningAdvice
看起來像下面這樣。
原始代碼:
<int-jdbc:inbound-channel-adapter id="jdbcInAdapter"
channel="inputChannel" data-source="myDataSource"
query="SELECT column1, column2 from tableA"
max-rows-per-poll="100">
<int:poller fixed-delay="10000"/>
</int-jdbc:inbound-channel-adapter>
新代碼:
<bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger">
<constructor-arg name="period" value="20000" />
</bean>
<bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata">
<property name="maxMessagesPerPoll" value="1000"/>
<property name="trigger" ref="jdbcDynamicTrigger"/>
</bean>
<bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy">
<property name="newPeriod" value="1"/>
<property name="adjustmentThreshold" value="100"/>
<property name="pollerMetadata" ref="jdbcPollerMetaData"/>
</bean>
<aop:config>
<aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" >
<aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/>
</aop:aspect>
</aop:config>
<bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
<constructor-arg ref="myDataSource" />
<constructor-arg value="SELECT column1, column2 from tableA" />
<property name="maxRowsPerPoll" value="100" />
</bean>
<int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean"
channel="inputChannel"
auto-startup="false">
<int:poller ref="jdbcPollerMetaData" />
</int:inbound-channel-adapter>
我做了些研究這個,覺得Spring集成也許可以提供一些掛鉤到輪詢使開發人員可以更好定製它們。
欲瞭解更多信息請參閱https://jira.spring.io/browse/INT-3633
如果JIRA沒有得到實現,有人有興趣在我實現了一個註釋添加到這一點,我會在GitHub上或依據可用的代碼。
是的你是對的,要點鏈接仍然有效。當我打開此問題時,顯示它在我使用的瀏覽器內部被阻止。 – 2015-02-17 17:19:15