2014-10-09 50 views
4

我的基本問題是,我只能在任何1小時的時間內從我的一個隊列(跨所有機器)處理7000條消息。我沒有看到用駱駝或activemq做到這一點,所以我採取了自己的路線停止/啓動邏輯。我看到了很多方法來做到這一點,我嘗試了其中的一些方法(只是遇到問題)。用activemq「駱駝」暫停路線的正確方法是什麼?

  1. camelContext.stopRoute(route):這工作,因爲信息停止正在處理,但是當我打電話camelContext.startRoute(route),它泄漏TCP連接,最終導致的ActiveMQ服務器打了極限而死亡。
  2. camelContext.suspendRoute(route):這也會停止正在處理的郵件,並且不會泄漏連接,但它似乎會殺死當我撥打camelContext.resumeRoute(route)時不會重新激活的活動使用者(在管理面板中可見)。我認爲,即使我恢復,最終可能導致沒有消息從隊列中被處理掉。
  3. 實施自定義RoutePolicy。公平地說,我還沒有嘗試過,但似乎它會成爲我根據我上面選擇的暫停方法而遇到的同樣問題的犧牲品。

有沒有解決這個問題的方法,我還沒有遇到過?

+0

客戶的路線政策將是我會這樣做的方式。當你說「按照我上面選擇的暫停方法」時,我認爲你的路由策略只是叫做stopConsumer()和startConsumer(),就像ThrottlingInflightRoutePolicy一樣。 – 2014-10-10 09:11:04

+0

「throller」會有幫助嗎? http://camel.apache.org/throttler.html – vikingsteve 2014-10-10 09:43:17

+0

@vikingsteve我需要在所有正在處理該隊列的機器上的隊列級別進行節流。您提到的節流只能用於限制單臺機器的處理。 – Denise 2014-10-10 10:50:07

回答

3

而不是停止路線,我會建議使用Throttler EIP

from("jms:queue:inbox") 
    .throttle(7000) 
    .timePeriodMillis(1000*60*60) 
    .to("log:result", "mock:result"); 

上面的示例將節流消息上接收到的jms:queue:inbox被髮送到mock:result確保最大7000個的消息,在任何1小時的窗口被髮送之前。

或者,更細粒度的控制,你可以定義一個限制路由策略如圖駱駝的throttling example

<route routePolicyRef="myPolicy"> 
    <from uri="jms:queue:inbox"/> 
    <transacted/> 
    <to uri="log:+++JMS +++?groupSize=100"/> 
    <to ref="foo"/> 
</route> 

節流警察的定義如下:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> 
    <property name="scope" value="Context"/> 
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes --> 
    <property name="maxInflightExchanges" value="20"/> 
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value --> 
    <property name="resumePercentOfMax" value="10"/> 
    <!-- output throttling activity at WARN level --> 
    <property name="loggingLevel" value="WARN"/> 
</bean> 

編輯1:

如果您需要全局節流,那麼您可以先讓一個消費者讀取消息,如上所述限制所有消息,然後將它們重新發送到另一個隊列,並讓它們重新讀取並處理它們的分佈式消費者。

編輯2:

或者,您也可以實現自己的ThrottlingInflightRoutePolicy訪問中央數據庫保存處理信息。這樣,你不需要一個「單節點主節氣門」。但是,數據庫也可能是單點故障。

+0

限制策略是否可以跨上下文/機器應用?否則,可能會出現單點故障。 – vikingsteve 2014-10-10 11:53:58

+0

@vikingsteve感謝您的問題。看我的*編輯2 *如何處理。 – 2014-10-10 12:15:50

+0

@彼得有趣。我沒有考慮讓其中一位消費者進行限制,但是這會引發很多管理消費者不對稱的開銷。 我不知道ThrottlingInflightRoutePolicy如何實現與我在問題中提到的camelContext.startRoute()或camelContext.resumeRoute()不同的停止/啓動路由,但是由於每個人都強烈支持它,所以我會給它一個鏡頭! – Denise 2014-10-10 13:51:54

0

彼得的問題得到了最好的答案,但我最終的結果是延長了ThrottlingInflightRoutePolicy,關於它是如何工作的沒有很好的解釋,所以我想我會對這個問題進行一些註釋並說明我是如何解決問題的。

public class MyRoutePolicy extends RoutePolicySupport implements CamelContextAware { 

    private CamelContext camelContext; 
    private final Lock lock = new ReentrantLock(); 
    private ContextScopedEventNotifier eventNotifier; 

    @Override 
    public final void setCamelContext(final CamelContext camelContext) { 
     this.camelContext = camelContext; 
    } 

    @Override 
    public final CamelContext getCamelContext() { 
     return this.camelContext; 
    } 

    @Override 
    public final void onExchangeDone(final Route route, final Exchange exchange) { 
     throttle(route); 
    } 

    private void throttle(final Route route) { 
     // this works the best when this logic is executed when the exchange is done 
     Consumer consumer = route.getConsumer(); 

     boolean stop = isRouteMarkedForSuspension(route.getId()) && ((JmsConsumer) route.getConsumer()).isStarted(); 
     if (stop) { 
      try { 
       lock.lock(); 
       stopConsumer(consumer); 
      } catch (Exception e) { 
       handleException(e); 
      } finally { 
       lock.unlock(); 
      } 
     } 

     // reload size in case a race condition with too many at once being invoked 
     // so we need to ensure that we read the most current size and start the consumer if we are already to low 
     boolean start = !isRouteMarkedForSuspension(route.getId()) && ((JmsConsumer) route.getConsumer()).isSuspended(); 
     if (start) { 
      try { 
       lock.lock(); 
       startConsumer(consumer); 
      } catch (Exception e) { 
       handleException(e); 
      } finally { 
       lock.unlock(); 
      } 
     } 
    } 

    @Override 
    protected final void doStart() throws Exception { 
     ObjectHelper.notNull(camelContext, "CamelContext", this); 
     eventNotifier = new ContextScopedEventNotifier(); 
     // must start the notifier before it can be used 
     ServiceHelper.startService(eventNotifier); 
     // we are in context scope, so we need to use an event notifier to keep track 
     // when any exchanges is done on the camel context. 
     // This ensures we can trigger accordingly to context scope 
     camelContext.getManagementStrategy().addEventNotifier(eventNotifier); 
    } 

    @Override 
    protected final void doStop() throws Exception { 
     ObjectHelper.notNull(camelContext, "CamelContext", this); 
     camelContext.getManagementStrategy().removeEventNotifier(eventNotifier); 
    } 

    private class ContextScopedEventNotifier extends EventNotifierSupport { 

     @Override 
     public void notify(final EventObject event) throws Exception { 
      for (Route route : camelContext.getRoutes()) { 
       throttle(route); 
      } 
     } 

     @Override 
     public boolean isEnabled(final EventObject event) { 
      return event instanceof ExchangeCompletedEvent; 
     } 

     @Override 
     protected void doStart() throws Exception { 
      // noop 
     } 

     @Override 
     protected void doStop() throws Exception { 
      // noop 
     } 

     @Override 
     public String toString() { 
      return "ContextScopedEventNotifier"; 
     } 
    } 
} 

所以我加入了RoutePolicy上述所有我的路線,像這樣:

from(uri).routePolicy(routePolicy).process(runner); 

MyRoutePolicy是一個內部類和isRouteMarkedForSuspension在主類中定義。

throttle被擊中在兩個點:被處理後

  • 交換(消息)。這對於確定消費者是否應該暫停很有用。
  • 通過ContextScopedEventNotifier通知事件。這對於確定消費者是否應該恢復很有用。