2015-02-06 45 views
2

我有一個接收大消息(在RDBMS表中登陸)的流程,所以我無法在給定的時間處理太多的消息。因此,我正在使用<int:poller max-messages-per-poll="" />節流處理,還有一些queuescapacity設置爲<int:queue capacity="">。我知道多個線程/事務將參與這個流程,並且對於這種用法來說這是可以接受的。在Spring集成中如何最好地實現DynamicPoller

查詢輪詢數據庫需要一些時間來運行,因此我不想運行它比我需要更頻繁。此外,這個流接收到的消息往往會在「突發」內出現,這意味着它可能會得到1000條消息,然後在一個小時內就沒有消息進入。

我想要做的是使用dynamic-poller,它將不經常輪詢(因爲注意到查詢運行成本高),除非我看到我得到了一連串的消息,在這種情況下我想非常頻繁地進行輪詢直到處理完所有消息。例如,如果我有<int:poller max-messages-per-poll="100" />,並且我知道輪詢器只讀取了100條消息,那麼很可能在RDBMS中有更多消息需要處理,並且我們應該在處理完成後立即再次輪詢。

我知道春天不提供一種方法來修改trigger使它在本質上是動態,並已看了看Spring集成參考「7.1.5 Change Polling Rate at Runtime「 並在dynamic-poller樣本項目:Dynamic Poller
這是一個開始,但我真的需要poller改變它是一個基於電流負載頻率。
我可能不是這個正確的,但我想也許加里提到這樣的事情會很有趣在他的談話,以落實「Implementing High-Availability Architectures with Spring Integration」。
在任何情況下編寫一個班級來改變poller頻率似乎都不是什麼大不了的事情。更有挑戰性的是如何知道投票何時發生,因爲沒有任何內容被髮布到輸出頻道,所以沒有產生任何結果。

一些選項,我認爲:

  1. 附上<int:wire-tap channel="" />到調用<int:service-activator>poller的通道。服務激活器檢查消息的數量並在DynamicPeriodicTrigger上調整pollerperiod
    問題是,如果沒有收到消息,這將永遠不會被調用,所以一旦我調整更頻繁地輪詢輪詢期將無限期地保持。

  2. 同#1,但添加邏輯以DynamicPeriodicTrigger,將還原periodinitialDelay下一個觸發之後發生或一定的時間之後。

  3. 使用<int:poller>元素中的<int:advice-chain>元素與MethodInterceptor實現。
    類似於Artem在此link中建議的內容。 雖然這可以讓我在receive方法的前面,但它不允許我訪問receive方法的結果(這會給我檢索到的消息的數量)。請注意,這似乎是由加里提到的link所確認的。

    請求處理程序通知鏈是一個特例;我們必須注意只提供內部端點方法而不提供任何下游處理(在輸出通道上)。

    爲輪詢者提供建議更簡單,因爲我們建議整個流程。如「7.1.4名稱空間支持」小節「AOP建議鏈」小節所述,您只需通過實施MethodInterceptor接口來創建建議。

    SourcePollingChannelAdapterFactoryBeanTests.testAdviceChain()一個非常簡單的建議......

    代碼:
    adviceChain.add(new MethodInterceptor() {
    public Object invoke(MethodInvocation invocation) throws Throwable {
    adviceApplied.set(true);
    return invocation.proceed();
    }
    });
    這僅僅是用來斷言的意見是正確地稱呼;真正的建議會在invocation.proceed()之前和/或之後添加代碼。

    實際上,此建議建議所有方法,但只有一個,(Callable.call())。

  4. 跟一個切入點,看起來爲Message<T> receive()方法創建AfterReturning建議。

  5. 克隆JdbcPollingChannelAdapter並添加我的鉤子在這個新類。

  6. 也許Gary建議在這個link將是有用的,但「主旨」鏈接不再有效。

更新:
我最終實現的選擇是使用AfterReturningAdvice看起來像下面這樣。
原始代碼:

<int-jdbc:inbound-channel-adapter id="jdbcInAdapter" 
    channel="inputChannel" data-source="myDataSource" 
    query="SELECT column1, column2 from tableA" 
    max-rows-per-poll="100"> 
    <int:poller fixed-delay="10000"/> 
</int-jdbc:inbound-channel-adapter> 

新代碼:

<bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger"> 
    <constructor-arg name="period" value="20000" /> 
</bean> 
<bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata"> 
    <property name="maxMessagesPerPoll" value="1000"/> 
    <property name="trigger" ref="jdbcDynamicTrigger"/> 
</bean> 
<bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy"> 
    <property name="newPeriod" value="1"/> 
    <property name="adjustmentThreshold" value="100"/> 
    <property name="pollerMetadata" ref="jdbcPollerMetaData"/> 
</bean> 
<aop:config> 
    <aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" > 
     <aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/> 
    </aop:aspect> 
</aop:config> 
<bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter"> 
    <constructor-arg ref="myDataSource" /> 
    <constructor-arg value="SELECT column1, column2 from tableA" /> 
    <property name="maxRowsPerPoll" value="100" /> 
</bean> 
<int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean" 
    channel="inputChannel" 
    auto-startup="false"> 
    <int:poller ref="jdbcPollerMetaData" /> 
</int:inbound-channel-adapter> 

我做了些研究這個,覺得Spring集成也許可以提供一些掛鉤到輪詢使開發人員可以更好定製它們。
欲瞭解更多信息請參閱https://jira.spring.io/browse/INT-3633

如果JIRA沒有得到實現,有人有興趣在我實現了一個註釋添加到這一點,我會在GitHub上或依據可用的代碼。

回答

1

感謝您打開JIRA問題;我們應該討論那裏的功能,因爲堆棧溢出不適合擴展對話。

但是,我不確定上面的意思是「...但是」主旨「鏈接不再有效......」。它適用於我... https://gist.github.com/garyrussell/5374267但讓我們在JIRA討論。

+0

是的你是對的,要點鏈接仍然有效。當我打開此問題時,顯示它在我使用的瀏覽器內部被阻止。 – 2015-02-17 17:19:15

相關問題