2012-08-14 77 views
2

我對駱駝非常陌生,並且一直在努力瞭解如何在特定場景中使用駱駝。 在這種情況下,有一個(基於Java的)代理程序會不時生成操作。我需要一個事件驅動的消費者來獲得這些事件的通知。這些事件將被路由到「文件」製作者(暫時)。如何在駱駝中實現事件驅動消費者

在駱駝書中,示例是針對投票的消費者。我無法找到針對事件驅動型消費者的通用解決方案。 我碰到一個類似的實現來抓JMX:

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 

JMXEndpoint jmxEndpoint;  
public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 
    super(endpoint, processor); 
    this.jmxEndpoint = endpoint; 
} 

public void handleNotification(Notification notification, Object handback) { 
    try { 
     getProcessor().process(jmxEndpoint.createExchange(notification)); 
    } catch (Throwable e) { 
     handleException(e); 
    } 
} 

}

這裏,每當JMX通知到達的handleNotification被調用。

我相信我必須做類似的事情才能讓我的消費者在代理生成操作時得到通知。但是,上面的handleNotification方法是JMX特有的。該網頁上說:「在實現您自己的事件驅動的消費者時,您必須確定一個類似的事件偵聽器方法,以便在您的自定義消費者中實現。」

我想知道:如何識別類似的事件偵聽器,以便我的客戶在我的代理有動作時得到通知。

任何意見/鏈接到網頁非常感謝。

回答

4

事件驅動是駱駝是什麼。

任何路由實際上是一個事件監聽器。

給出的路線:

from("activemq:SomeQueue"). 
    bean(MyClass.class); 

public class MyBean{ 
    public void handleEvent(MyEventObject eventPayload){ // Given MyEventObject was sent to this "SomeQueue". 
    // whatever processing. 
    } 
} 

這將使一個事件驅動消費者。如何發送活動呢?如果您的應用程序中嵌入了駱駝,並從您的事件動作生成器訪問CamelContext,那麼您可以從中獲取Producer Template,並將您的事件觸發到您在Camel中定義的任何端點,例如「seda:SomeQueue」。否則,如果您的Camel實例在另一個服務器或實例中運行,而不是您的應用程序,那麼您應該使用其他一些傳輸方式而不是SEDA。最好是JMS,但其他人也會這樣做,挑選。 ActiveMQ是我的最愛。您可以輕鬆地啓動嵌入式ActiveMQ的實例(JVM內),並通過它連接到駱駝:

camelContext.addComponent("activemq", activeMQComponent("vm://localhost")); 
+0

非常感謝您的回覆。我特別想知道如何發送活動。感謝您解釋這一點。將盡快嘗試。 (我是JEE的新手,所以可能需要一段時間才能弄清楚這一切) – sura 2012-08-14 20:07:09

+0

好。更新了關於不同傳輸的答案,即嵌入了Seda和ActiveMQ。 – 2012-08-14 21:40:17

4

我知道這是一個老問題,但我一直在用它掙扎,只是想我要記錄我的發現爲其他人尋找答案。

當你創建(延長DefaultEndpoint)重寫下面的方法來創建消費者端點類:

public Consumer createConsumer(Processor processor) 

在你的消費者的話,你可以使用一個處理器 - 在此處理器上調用「過程」將創建一個事件並觸發路線。

例如,假設您有一些Java API偵聽消息,並且有某種Listener。在我的情況下,監聽器把收到的郵件到的LinkedBlockingQueue,和我的消費者「DOSTART」的方法是這樣的(添加你自己的錯誤處理):

@Override 
protected void doStart() throws Exception { 
    super.doStart(); 

    // Spawn a new thread that submits exchanges to the Processor 
    Runnable runnable = new Runnable() { 
     @Override 
     public void run() { 
      while(true) { 
       IMessage incomingMessage = myLinkedBlockingQueue.take(); 
       Exchange exchange = getEndpoint().createExchange(); 
       exchange.getIn().setBody(incomingMessage); 
       myProcessor.process(exchange); 
      } 
     } 
    }; 
    new Thread(runnable).start(); 
} 

現在我可以把創建創建端點組件該消費者在我CamelContext,並使用它像這樣:

from("mycomponent:incoming").to("log:messages"); 

和日誌消息時將觸發每一個新的消息從Java API到達時間。

希望能幫助別人!

+0

是的,它幫了我很多。 – vels4j 2016-02-18 14:51:37

+0

super.doStart();是必須的? – vels4j 2016-02-18 14:53:44

+0

可能不是,我想我的IDE會自動將它放入。如果您查看DefaultEndpoint中的「doStart()」方法,它只是「// noop」,所以我認爲您可以安全地將其刪除 – Matt 2016-02-18 15:45:59