2017-02-20 47 views
1

我下面從akka.io容錯http://doc.akka.io/docs/akka/current/java/fault-tolerance.html .I代碼reference.My要求如下采取這種代碼重試相同的消息與定義的間隔時間定義的數字: 讓我們假設演員在消息 上崩潰,並由其主管重新啓動。然後他開始在他的郵箱中處理下一個 消息。這導致崩潰的消息是 「dropped'.But我要處理的時間的特定數目相同的動作(假定3次),在它們之間限定的間隔(假定1秒)。如何做到這一點使用阿卡監督。實際上是通過演員我想檢查特定服務的API是否運行(即給予一定的例外)。所以,如果有一個特定的嘗試任何異常,(假設404未找到),然後重新發送郵件到失敗的工人 直到達到supervisorStrategy指定的maxNrOfRetries。如果工作人員失敗了「maxNrOfRetries」次,那麼只需記錄「爲此xx消息達到的最大嘗試次數」。如何在java中執行此操作。使用演員監督,如何在發生故障時

我的上司類:

public class Supervisor extends UntypedActor { 


private static SupervisorStrategy strategy = 

new OneForOneStrategy(3, Duration.create("1 minute"), 
    new Function<Throwable, Directive>() { 
    @Override 
    public Directive apply(Throwable t) { 
     if (t instanceof Exception) { 
     return restart(); 
     }else if (t instanceof IllegalArgumentException) { 
     return stop(); 
     } else { 
     return escalate(); 
     } 
    } 
    }); 

@Override 
public SupervisorStrategy supervisorStrategy() { 
return strategy; 


} 
public void onReceive(Object o) { 
if (o instanceof Props) { 
    getSender().tell(getContext().actorOf((Props) o), getSelf()); 
} else { 
    unhandled(o); 
} 


} 
} 

子類:

public class Child extends UntypedActor { 


    public void onReceive(Object o) throws Exception { 
if (o instanceof String) { 
Object response = someFunction((String) message);//this function returns either successfull messgae as string or exception 
if(response instanceOf Exception) { 
    throw (Exception) response; 
    } 
    else 
    getSender().tell(response, getSelf()) 
}else { 
    unhandled(o); 
} 


} 

} 

創建演員:

Props superprops = Props.create(Supervisor.class); 
ActorRef supervisor = system.actorOf(superprops, "supervisor"); 
ActorRef child = (ActorRef) Await.result(ask(supervisor, 
Props.create(Child.class), 5000), timeout); 
child.tell("serVice_url", ActorRef.noSender()); 

因爲我要重複的過程中如果失敗occurs.But它的SERVICE_URL沒有發生。如果寫在creatng演員作爲child.tell("serVice_url_2", ActorRef.noSender());然後這條線exucted下一行,但我要處理的相同的操作(其中發生故障)的時間的特定數目(假定3次),在它們之間限定的間隔。 請指導我做到這一點。

回答

0

我想我已經開發出了一種方法。儘管我仍然需要在生產層面上做一個測試。我在下面寫下這個答案,因爲它可能對嘗試實現相同目標的人有所幫助。如果有人發現更好他/她受到歡迎。 這裏要提一下,通過這種方法Supervisor在一定的時間範圍內處理與發生故障的消息相同的動作(假設3次)。我無法定義他們。 這裏是代碼。主管類。

public class MyUntypedActor extends UntypedActor { 
//here I have given Max no retrilas as 10.I will controll this number from logic as per my own requirements.But user given number of retrials can not exceed 10. 
private static SupervisorStrategy strategy = new AllForOneStrategy(10, Duration.create(5, TimeUnit.MINUTES), 
     new Function<Throwable, SupervisorStrategy.Directive>() { 
      @Override 
      public SupervisorStrategy.Directive apply(Throwable t) { 
       if (t instanceof Exception) { 
        //System.out.println("exception" + "*****" + t.getMessage() + "***" + t.getLocalizedMessage()); 
        return restart(); 
       } else if (t instanceof NullPointerException) { 
        return restart(); 
       } else if (t instanceof IllegalArgumentException) { 
        return stop(); 
       } else { 
        return escalate(); 
       } 
      } 
     }); 

@Override 
public SupervisorStrategy supervisorStrategy() { 
    return strategy; 
} 

public void onReceive(Object o) { 
    if (o instanceof Props) { 
     getSender().tell(getContext().actorOf((Props) o), getSelf()); 
    } else { 
     unhandled(o); 
    } 
} 
} 

孩子類,我們將寫我們的邏輯。

public class Child extends UntypedActor { 
//Through preRestart it will push the message for which exception occured before the restart of the child 
@Override 
public void preRestart(final Throwable reason, final scala.Option<Object> message) throws Exception { 
    System.out.println("reStarting :::" + message.get()); 
    SetRules.setRemainingTrials(SetRules.remainingTrials + 1); 
    getSelf().tell(message.get(), getSender()); 
}; 

public void onReceive(Object o) throws Exception { 

    if (o instanceof Exception) { 
     throw (Exception) o; 
    } else if (o instanceof Integer) { 
    } else if (o.equals("get")) { 
     getSender().tell("get", getSelf()); 
    } else if (o instanceof String) { 

     try { 
      // here either we can write our logic directly or for a better 
      // approach can call a function where the logic will be excuted. 
      getSender().tell("{\"meggase\":\"Succesfull after " + SetRules.remainingTrials + " retrials\"}", 
        getSelf()); 
     } catch (Exception ex) { 
      if (SetRules.remainingTrials == SetRules.noOfRetries) { 
       getSender().tell("{\"meggase\":\"Failed to connect after " + SetRules.noOfRetries + " retrials\"}", 
         getSelf()); 
      } else { 
       Exception value1 = ex; 
       throw (Exception) value1; 
      } 
     } 
    } else { 
     unhandled(o); 
    } 
} 
} 

的SetRules類有相關信息的用戶提供noOfRetrials並在重試的每個沙爹通過remainingTrials

public class SetRules { 

public static int noOfRetries; 
public static int remainingTrials; 

public SetRules(int noOfRetries, int remainingTrials) { 
    super(); 
    SetRules.noOfRetries = noOfRetries; 
    SetRules.remainingTrials = remainingTrials; 
} 

public int getRemainingTrials() { 
    return remainingTrials; 
} 

public static void setRemainingTrials(int remainingTrials) { 
    SetRules.remainingTrials = remainingTrials; 
} 
} 

存儲有關重試次數的信息,現在,讓我們創建的演員。

Props superprops = Props.create(MyUntypedActor.class); 
SetRules setRules=new SetRules(3,0); 
ActorSystem system = ActorSystem.create("helloakka"); 
ActorRef supervisor = system.actorOf(superprops, "supervisor"); 
ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), Duration.create(5, "minutes")); 
Future<Object> future = Patterns.ask(child, service_Url, new Timeout(Duration.create(5, "minutes"))); 
Object result = Await.result(future, Duration.create(5, "minutes")); 
System.out.println(result); 
+0

我問[類似的問題(https://stackoverflow.com/questions/42319399/akka-supervisor-delayed-restart-without-losing-message)幾乎在同一時間比你;我也在審查你的。使用此解決方案,您確實可以將消息放回,但無法定義重新啓動之間的時間(回退)。 –

+0

是的,我沒有能夠定義他們之間的時間間隔。任何建議如何做到這一點?@ɐuıɥɔɐɯ –