2017-10-16 61 views
2

我想在羣集中運行Akka調度程序的單個實例。目前我的調度程序在我的本地工作正常,但不像預期的那樣。調度程序從數據庫中選擇訂單並推送到Kafka主題。
我正在使用akka 2.5.6(Java)。我已經通過了official Doc,但沒有提供太多的幫助。 任何幫助將不勝感激。如何在集羣中運行一次Akka Scheduler?

public class OrderReprocessActor extends UntypedActor { 


    LoggingAdapter log = Logging.getLogger(getContext().system(), this); 
    OrderProcessorJdbcConnection orderProcessorJdbcConnection; 
    private final String SELECT_QUERY_TO_GET_FAILED_ORDER="SELECT * FROM ORDER_HISTORY WHERE ORDER_STATUS = ?"; 
    CommonPropsUtil commonPropsUtil; 

    final Cluster cluster = Cluster.get(getContext().system());   

    public static Props getProps() { 
     return Props.create(OrderReprocessActor.class); 
    } 

    @Inject 
    public OrderReprocessActor(OrderProcessorJdbcConnection orderProcessorJdbcConnection , CommonPropsUtil commonPropsUtil){ 
     this.orderProcessorJdbcConnection = orderProcessorJdbcConnection; 
     this.commonPropsUtil = commonPropsUtil; 
    } 

    @Override 
    public void onReceive(Object message) throws Throwable { 

     String failedStatus = (String) message; 
     List<OrderHistory> failedOrderList = getOrders(failedStatus); 
     pushOrderToKafka(failedOrderList); 
     String intervalSeconds = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.ORDER_REPROCESSOR_SCHEDULER_INTERVAL); 
     if(StringUtils.isNotEmpty(intervalSeconds)) 
     { 
      int interval = Integer.parseInt(intervalSeconds); 
      getContext().system().scheduler().scheduleOnce(Duration.create(interval, TimeUnit.SECONDS), 
        () -> { 
         getSelf().tell(failedStatus, ActorRef.noSender()); 
        }, getContext().system().dispatcher()); 
     } 
    } 

    /** 
    * This method takes the failedOrderList and pushes to Kafka Topic 
    * 
    */ 
    private void pushOrderToKafka(List<OrderHistory> failedOrders) { 

     log.info("Entering pushOrderToKafka()"); 
     String kafkaOrderTopic = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_SUBMIT_ORDER_TOPIC); 
     Properties props = getKafkaProperties(); 
     Producer<String, Order> producer = new KafkaProducer<>(props); 
     for (OrderHistory orderHistory : failedOrders) { 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       Order order = objectMapper.readValue(orderHistory.getOrderData().toString(),Order.class); 
       log.info("******************Order ID..."+orderHistory.getOrderId()); 
       producer.send(new ProducerRecord<String, Order>(kafkaOrderTopic, orderHistory.getOrderId(), order)).get(); 
      } catch (IOException e) { 
       log.error("IOException caught , message="+e.getMessage()); 
      } catch (InterruptedException e) { 
       log.error("InterruptedException caught , message="+e.getMessage()); 
      } catch (ExecutionException e) { 
       log.error("ExecutionException caught , message="+e.getMessage()); 
      } 
     } 
     producer.close(); 
     log.info("Exiting pushOrderToKafka()"); 
    } 

    /** 
    * This method return kafka connection properties 
    * @return 
    */ 
    private Properties getKafkaProperties() { 
     String kafkaBootStrapServers = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_BOOTSTRAP_SERVERS); 
     Properties props = new Properties(); 
     props.put(CommonConstants.BOOTSTRAP_SERVERS, kafkaBootStrapServers); 
     props.put(CommonConstants.KEY_SERIALIZER, VZWCommonConstants.STRING_SERIALIZER); 
     props.put(CommonConstants.VALUE_SERIALIZER, VZWCommonConstants.ORDER_SERIALIZER); 
     return props; 
    } 

    /** 
    *This method get all the failed Order from DB 
    * @return List<OrderReprocessActor.OrderHistory> 
    * @throws SQLException 
    */ 
    private List<OrderReprocessActor.OrderHistory> getOrders(String failStatus) throws SQLException { 
     log.info("Entering getAllFailedOrdersFromDB()"); 
     Connection connection = orderProcessorJdbcConnection.getConnection(); 

     try { 
      PreparedStatement pstmt = connection.prepareStatement(SELECT_QUERY_TO_GET_FAILED_ORDER); 
      pstmt.setString(1,failStatus); 
      ResultSet rersultSet = pstmt.executeQuery(); 
      return getOrdersFromResultSet(rersultSet); 
     }catch (SQLException e){ 
      log.error("SQLException caught while fetching failed Order from DB"); 
      log.error(e.getMessage()); 
     }finally { 
       orderProcessorJdbcConnection.releaseConnection(connection); 
     } 
     log.info("Exiting getAllFailedOrdersFromDB()"); 
     return null; 
    } 

    /** 
    * Retrives order from sql result set 
    * @param rersultSet 
    * @return 
    * @throws SQLException 
    */ 
    private List<OrderHistory> getOrdersFromResultSet(ResultSet rersultSet) throws SQLException { 
     List<OrderReprocessActor.OrderHistory> failedOrderList = new ArrayList<>(); 
     while(rersultSet.next()){ 
      String orderId = rersultSet.getString("order_id"); 
      String orderData = rersultSet.getString("order_data"); 
      OrderHistory orderHistory = new OrderHistory(); 
      orderHistory.setOrderId(orderId); 
      orderHistory.setOrderData(orderData); 
      failedOrderList.add(orderHistory); 
     } 
     return failedOrderList; 
    } 

    public static class OrderHistory{ 

     private String orderId; 
     private String orderData; 
     public String getOrderId() { 
      return orderId; 
     } 
     public void setOrderId(String orderId) { 
      this.orderId = orderId; 
     } 
     public String getOrderData() { 
      return orderData; 
     } 
     public void setOrderData(String orderData) { 
      this.orderData = orderData; 
     } 
    } 


} 

回答

0

讓你的OrderReprocessActor a cluster singleton。從文檔:

集羣單例模式由akka.cluster.singleton.ClusterSingletonManager實現。它管理所有羣集節點中的一個單身演員實例或一組使用特定角色標記的節點。 ClusterSingletonManager是應該在集羣中的所有節點或具有指定角色的所有節點上啓動的角色。實際的單身演員由最早的節點上的ClusterSingletonManager通過從提供的道具創建一個子演員來啓動。 ClusterSingletonManager確保至多有一個單例實例在任何時間點運行。

+0

嗨,我已經通過鏈接提到的問題。但我無法理解如何與我擁有的東西整合。從理論上講,我明白這是什麼方式,但代碼明智,我不清楚。 –

-1

集羣單身創建像這樣:

final ClusterSingletonManagerSettings settings = 
    ClusterSingletonManagerSettings.create(system); 

system.actorOf(
    ClusterSingletonManager.props(
    Props.create(Consumer.class,() -> new Consumer(queue, testActor)), 
    TestSingletonMessages.end(), 
    settings), 
    "consumer"); 

阿卡將確保演員只有羣集的領導節點上創建。

要使用單演員,你通過它的路徑要求代理它:

ClusterSingletonProxySettings proxySettings = 
    ClusterSingletonProxySettings.create(system); 

ActorRef proxy = 
    system.actorOf(ClusterSingletonProxy.props("/user/consumer", proxySettings), 
    "consumerProxy"); 

這些例子是從the docs調整。

+0

你能告訴我哪個包包含Consumer.class,還有什麼類型的隊列?我的代碼中沒有任何隊列 –

+0

只是複製粘貼代碼並不是我期望從知名用戶那裏得到的東西。 –

+0

您已經知道如何計劃消息,並且有關如何創建消息並將消息發送到羣集單例的額外詳細信息,應該清楚如何將它們串聯在一起。你沒有在你的問題中具體說明它是如何工作的。也許這會幫助你帶來更好的答案。 – Synesso