2014-05-02 113 views
0

我正在嘗試使用Akka和Camel競爭事件消費者實現。我使用Akka 2.3.2和Camel 5.8.0。我將駱駝連接到ActiveMQ代理,並使用生產者從另一端生成消息。在以下代碼中,EventManager是創建消費者池的主設備,EventProcessor是消息處理參與者。Akka演員Java競爭消費者

EventManager.java

import org.apache.activemq.ActiveMQConnection; 
    import org.apache.activemq.camel.component.ActiveMQComponent; 
    import org.apache.camel.CamelContext; 

    import akka.actor.ActorRef; 
    import akka.actor.ActorSystem; 
    import akka.actor.Props; 
    import akka.actor.UntypedActor; 

    import akka.camel.Camel; 
    import akka.camel.CamelExtension; 
    import akka.japi.Creator; 
    import akka.routing.RoundRobinPool; 

    public class EventManager { 




     private final ActorSystem akkaSystem; 

     private CamelContext camelContext = null; 

     private ActorRef workRouter; 

     public EventManager(ActorSystem system) { 
      akkaSystem = system; 

      initialize(); 
     } 

     public void initialize() { 

      Camel camel = CamelExtension.get(akkaSystem); 

      camelContext = camel.context(); 

      ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent("tcp://localhost:61616"); 
      activemqComponent.setDeliveryPersistent(false); 
      camelContext.addComponent("activemq",activemqComponent); 


      int numOfWorkers = 5; 

// distributing the message processing across a pool of 5 actors 
      ActorRef workRouter = 
         akkaSystem.actorOf(new RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)), 
         "workRouter"); 

     } 

    } 

EventProcessor.java

import org.apache.log4j.Logger; 



import akka.actor.UntypedActor; 
import akka.camel.CamelMessage; 
import akka.camel.javaapi.UntypedConsumerActor; 

public class EventProcessor extends UntypedConsumerActor{ 

    private static final Logger LOGGER = Logger.getLogger(EventProcessor.class); 
    public EventProcessor() { 

    } 

    public void onReceive(Object message) { 
     if(message instanceof CamelMessage) { 
      CamelMessage camelMessage = (CamelMessage) message; 
      String body = camelMessage.getBodyAs(String.class, getCamelContext()); 
       LOGGER.info("Message handled by :" +this.getSelf().path().name()); 

       LOGGER.info("Message body:" + body); 
     } 


    } 

    public boolean autoAck() { 
     return true; 
    } 
    public String getEndpointUri() { 
     return "activemq:queue:dest"; 
    } 

} 

我看到的問題是,這些消息似乎是由一個演員被消耗,沒有得到過游泳池分佈。我是否需要創建單獨的駱駝路線來分發?我也想在不同的物理節點之間分配處理。欣賞你的投入和最佳實踐。

回答

0

嘗試在端點上

return "activemq:queue:dest?asyncConsumer=true"; 

在AMQ端點

return "activemq:queue:dest?concurrentConsumers=50"; 

..或啓用異步消費者設置併發消費者也似乎你得到的版本信息是錯誤的。沒有駱駝版本5.8。我認爲這是AMQ版本。

+0

嗨克勞斯,我打算說activemq-camel 5.8.0。我似乎已經解決了。問題是我正在分開運行生產者和消費者代碼。即用消息加載隊列,斷開生產者,然後啓動消費者消費消息。但是,當我同時運行生產者和消費者時,這些消息正在分散。感謝您的建議! – user3364247