2016-02-18 108 views
0

我是Apache Camel的新手。在hp不間斷中,有一個接收器接收由事件管理器生成的事件,假設爲流。我的目標是設置一個消費者終端,它接收傳入消息並通過Camel進行處理。Apache Camel創建消費者組件

另一個端點我只需要將它寫入日誌。從我的研究,我明白,對於消費者終點,我需要創建自己的組件和配置會像

from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO") 

這裏是從事件系統接收消息,我的代碼段。

Receive receive = com.tandem.ext.guardian.Receive.getInstance(); 
    byte[] maxMsg = new byte[500]; // holds largest possible request 
    short errorReturn = 0; 
    do { // read messages from $receive until last close 
     try { 
      countRead = receive.read(maxMsg, maxMsg.length); 
      String receivedMessage=new String(maxMsg, "UTF-8"); 
      //Here I need to handover receivedMessage to camel 

     } catch (ReceiveNoOpeners ex) { 
      moreOpeners = false; 
     } catch(Exception e) { 
      moreOpeners = false; 
     } 
    } while (moreOpeners); 

有人可以引導一些提示如何使這作爲消費者。

回答

1

10'000英尺視圖是這樣的:

您需要從實施組件開始。最簡單的入門方法是延長org.apache.camel.impl.DefaultComponent。你唯一要做的就是覆蓋DefaultComponent::createEndpoint(..)。很明顯,它所做的是創建您的端點。

所以接下來你需要的是實現你的端點。爲此延伸org.apache.camel.impl.DefaultEndpoint。覆蓋最低DefaultEndpoint::createConsumer(Processor)以創建自己的消費者。

最後但並非最不重要的是你需要實現消費者。同樣,最好的辦法是延長org.apache.camel.impl.DefaultConsumer。消費者是您的代碼必須去生成您的消息的地方。通過構造函數,您將收到對端點的引用。使用端點引用創建一個新的Exchange,填充它並沿路徑發送它。沿

Exchange ex = endpoint.createExchange(ExchangePattern.InOnly); 
setMyMessageHeaders(ex.getIn(), myMessagemetaData); 
setMyMessageBody(ex.getIn(), myMessage); 

getAsyncProcessor().process(ex, new AsyncCallback() { 
    @Override 
    public void done(boolean doneSync) { 
     LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously")); 
    } 
}); 

線的東西,我建議你選擇一個簡單的組件(DirectComponent?)作爲一個榜樣。

+0

感謝您的回答,我創建了'MessageComponent','MessageEndpoint','MessageProducer'和'MessageConsumer'。 'MessageConsumer'擴展了'DefaultConsumer'。 找不到處理我的消息的方法。我是否需要將其添加到構造函數本身中? – vels4j

+0

重寫'DefaultConsumer'的doStart()'和doStop()'方法來啓動/停止外部消息訂閱/輪詢。在我的情況下,我在消費者中實現了一個回調方法,每當我收到外部消息時都會調用它。在這個例子中,我創建了header和body,並將其設置爲一個新的'Exchange',如上所示,並且消息在路由下發送。 – Ralf

+0

完成,工作正常,謝謝 – vels4j

0

因此添加我自己的消費者組件可能有助於某人。

public class MessageConsumer extends DefaultConsumer { 

private final MessageEndpoint endpoint; 

private boolean moreOpeners = true; 

public MessageConsumer(MessageEndpoint endpoint, Processor processor) { 
    super(endpoint, processor); 
    this.endpoint = endpoint; 

} 


@Override 
protected void doStart() throws Exception { 

    int countRead=0; // number of bytes read 

    do { 
     countRead++; 
      String msg = String.valueOf(countRead)+" "+System.currentTimeMillis(); 
      Exchange ex = endpoint.createExchange(ExchangePattern.InOnly); 
      ex.getIn().setBody(msg); 
      getAsyncProcessor().process(ex, new AsyncCallback() { 
       @Override 
       public void done(boolean doneSync) { 
        log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously")); 
       } 
      }); 
      // This is an echo server so echo request back to requester  

    } while (moreOpeners); 
} 

@Override 
protected void doStop() throws Exception { 
    moreOpeners = false; 
    log.debug("Message processor is shutdown"); 
} 

}