我想在羣集中運行Akka調度程序的單個實例。目前我的調度程序在我的本地工作正常,但不像預期的那樣。調度程序從數據庫中選擇訂單並推送到Kafka主題。
我正在使用akka 2.5.6(Java)。我已經通過了official Doc,但沒有提供太多的幫助。 任何幫助將不勝感激。如何在集羣中運行一次Akka Scheduler?
public class OrderReprocessActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
OrderProcessorJdbcConnection orderProcessorJdbcConnection;
private final String SELECT_QUERY_TO_GET_FAILED_ORDER="SELECT * FROM ORDER_HISTORY WHERE ORDER_STATUS = ?";
CommonPropsUtil commonPropsUtil;
final Cluster cluster = Cluster.get(getContext().system());
public static Props getProps() {
return Props.create(OrderReprocessActor.class);
}
@Inject
public OrderReprocessActor(OrderProcessorJdbcConnection orderProcessorJdbcConnection , CommonPropsUtil commonPropsUtil){
this.orderProcessorJdbcConnection = orderProcessorJdbcConnection;
this.commonPropsUtil = commonPropsUtil;
}
@Override
public void onReceive(Object message) throws Throwable {
String failedStatus = (String) message;
List<OrderHistory> failedOrderList = getOrders(failedStatus);
pushOrderToKafka(failedOrderList);
String intervalSeconds = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.ORDER_REPROCESSOR_SCHEDULER_INTERVAL);
if(StringUtils.isNotEmpty(intervalSeconds))
{
int interval = Integer.parseInt(intervalSeconds);
getContext().system().scheduler().scheduleOnce(Duration.create(interval, TimeUnit.SECONDS),
() -> {
getSelf().tell(failedStatus, ActorRef.noSender());
}, getContext().system().dispatcher());
}
}
/**
* This method takes the failedOrderList and pushes to Kafka Topic
*
*/
private void pushOrderToKafka(List<OrderHistory> failedOrders) {
log.info("Entering pushOrderToKafka()");
String kafkaOrderTopic = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_SUBMIT_ORDER_TOPIC);
Properties props = getKafkaProperties();
Producer<String, Order> producer = new KafkaProducer<>(props);
for (OrderHistory orderHistory : failedOrders) {
ObjectMapper objectMapper = new ObjectMapper();
try {
Order order = objectMapper.readValue(orderHistory.getOrderData().toString(),Order.class);
log.info("******************Order ID..."+orderHistory.getOrderId());
producer.send(new ProducerRecord<String, Order>(kafkaOrderTopic, orderHistory.getOrderId(), order)).get();
} catch (IOException e) {
log.error("IOException caught , message="+e.getMessage());
} catch (InterruptedException e) {
log.error("InterruptedException caught , message="+e.getMessage());
} catch (ExecutionException e) {
log.error("ExecutionException caught , message="+e.getMessage());
}
}
producer.close();
log.info("Exiting pushOrderToKafka()");
}
/**
* This method return kafka connection properties
* @return
*/
private Properties getKafkaProperties() {
String kafkaBootStrapServers = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_BOOTSTRAP_SERVERS);
Properties props = new Properties();
props.put(CommonConstants.BOOTSTRAP_SERVERS, kafkaBootStrapServers);
props.put(CommonConstants.KEY_SERIALIZER, VZWCommonConstants.STRING_SERIALIZER);
props.put(CommonConstants.VALUE_SERIALIZER, VZWCommonConstants.ORDER_SERIALIZER);
return props;
}
/**
*This method get all the failed Order from DB
* @return List<OrderReprocessActor.OrderHistory>
* @throws SQLException
*/
private List<OrderReprocessActor.OrderHistory> getOrders(String failStatus) throws SQLException {
log.info("Entering getAllFailedOrdersFromDB()");
Connection connection = orderProcessorJdbcConnection.getConnection();
try {
PreparedStatement pstmt = connection.prepareStatement(SELECT_QUERY_TO_GET_FAILED_ORDER);
pstmt.setString(1,failStatus);
ResultSet rersultSet = pstmt.executeQuery();
return getOrdersFromResultSet(rersultSet);
}catch (SQLException e){
log.error("SQLException caught while fetching failed Order from DB");
log.error(e.getMessage());
}finally {
orderProcessorJdbcConnection.releaseConnection(connection);
}
log.info("Exiting getAllFailedOrdersFromDB()");
return null;
}
/**
* Retrives order from sql result set
* @param rersultSet
* @return
* @throws SQLException
*/
private List<OrderHistory> getOrdersFromResultSet(ResultSet rersultSet) throws SQLException {
List<OrderReprocessActor.OrderHistory> failedOrderList = new ArrayList<>();
while(rersultSet.next()){
String orderId = rersultSet.getString("order_id");
String orderData = rersultSet.getString("order_data");
OrderHistory orderHistory = new OrderHistory();
orderHistory.setOrderId(orderId);
orderHistory.setOrderData(orderData);
failedOrderList.add(orderHistory);
}
return failedOrderList;
}
public static class OrderHistory{
private String orderId;
private String orderData;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderData() {
return orderData;
}
public void setOrderData(String orderData) {
this.orderData = orderData;
}
}
}
嗨,我已經通過鏈接提到的問題。但我無法理解如何與我擁有的東西整合。從理論上講,我明白這是什麼方式,但代碼明智,我不清楚。 –