而不是停止路線,我會建議使用Throttler EIP。
from("jms:queue:inbox")
.throttle(7000)
.timePeriodMillis(1000*60*60)
.to("log:result", "mock:result");
上面的示例將節流消息上接收到的jms:queue:inbox
被髮送到mock:result
確保最大7000個的消息,在任何1小時的窗口被髮送之前。
或者,更細粒度的控制,你可以定義一個限制路由策略如圖駱駝的throttling example:
<route routePolicyRef="myPolicy">
<from uri="jms:queue:inbox"/>
<transacted/>
<to uri="log:+++JMS +++?groupSize=100"/>
<to ref="foo"/>
</route>
節流警察的定義如下:
<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
<property name="scope" value="Context"/>
<!-- when we hit > 20 inflight exchanges then kick in and suspend the routes -->
<property name="maxInflightExchanges" value="20"/>
<!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value -->
<property name="resumePercentOfMax" value="10"/>
<!-- output throttling activity at WARN level -->
<property name="loggingLevel" value="WARN"/>
</bean>
編輯1:
如果您需要全局節流,那麼您可以先讓一個消費者讀取消息,如上所述限制所有消息,然後將它們重新發送到另一個隊列,並讓它們重新讀取並處理它們的分佈式消費者。
編輯2:
或者,您也可以實現自己的ThrottlingInflightRoutePolicy
訪問中央數據庫保存處理信息。這樣,你不需要一個「單節點主節氣門」。但是,數據庫也可能是單點故障。
客戶的路線政策將是我會這樣做的方式。當你說「按照我上面選擇的暫停方法」時,我認爲你的路由策略只是叫做stopConsumer()和startConsumer(),就像ThrottlingInflightRoutePolicy一樣。 – 2014-10-10 09:11:04
「throller」會有幫助嗎? http://camel.apache.org/throttler.html – vikingsteve 2014-10-10 09:43:17
@vikingsteve我需要在所有正在處理該隊列的機器上的隊列級別進行節流。您提到的節流只能用於限制單臺機器的處理。 – Denise 2014-10-10 10:50:07