0
我正在嘗試使用Akka和Camel競爭事件消費者實現。我使用Akka 2.3.2和Camel 5.8.0。我將駱駝連接到ActiveMQ代理,並使用生產者從另一端生成消息。在以下代碼中,EventManager是創建消費者池的主設備,EventProcessor是消息處理參與者。Akka演員Java競爭消費者
EventManager.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.japi.Creator;
import akka.routing.RoundRobinPool;
public class EventManager {
private final ActorSystem akkaSystem;
private CamelContext camelContext = null;
private ActorRef workRouter;
public EventManager(ActorSystem system) {
akkaSystem = system;
initialize();
}
public void initialize() {
Camel camel = CamelExtension.get(akkaSystem);
camelContext = camel.context();
ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
activemqComponent.setDeliveryPersistent(false);
camelContext.addComponent("activemq",activemqComponent);
int numOfWorkers = 5;
// distributing the message processing across a pool of 5 actors
ActorRef workRouter =
akkaSystem.actorOf(new RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)),
"workRouter");
}
}
EventProcessor.java
import org.apache.log4j.Logger;
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
public class EventProcessor extends UntypedConsumerActor{
private static final Logger LOGGER = Logger.getLogger(EventProcessor.class);
public EventProcessor() {
}
public void onReceive(Object message) {
if(message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
String body = camelMessage.getBodyAs(String.class, getCamelContext());
LOGGER.info("Message handled by :" +this.getSelf().path().name());
LOGGER.info("Message body:" + body);
}
}
public boolean autoAck() {
return true;
}
public String getEndpointUri() {
return "activemq:queue:dest";
}
}
我看到的問題是,這些消息似乎是由一個演員被消耗,沒有得到過游泳池分佈。我是否需要創建單獨的駱駝路線來分發?我也想在不同的物理節點之間分配處理。欣賞你的投入和最佳實踐。
嗨克勞斯,我打算說activemq-camel 5.8.0。我似乎已經解決了。問題是我正在分開運行生產者和消費者代碼。即用消息加載隊列,斷開生產者,然後啓動消費者消費消息。但是,當我同時運行生產者和消費者時,這些消息正在分散。感謝您的建議! – user3364247