2016-04-23 66 views
0

我是Spring AOP的新手,並且已經嘗試了一下。Spring AOP - 正確配置重試建議

我正在嘗試設置重試&通過Spring AOP爲我的項目中的一個限制器。 用例如下: -

  1. 檢查TPS是否可用。如果不是,扔ThrottledException
  2. 如果一個ThrottledException被拋出,Retry

我遇到的問題是:此限制&重試組合運行到無限循環(如果TPS = 0)。也就是說,重試不會在'x'嘗試後停止。

我節流Interceptor是(高級別)是這樣的:

@Before("<pointcut>") 
public void invoke() throws ThrottlingException { 
     if (throttler.isThrottled(throttleKey)) { 
      throw new ThrottlingException("Call Throttled"); 
    } 
} 

我重試Interceptor是這樣的:

@AfterThrowing(pointcut="execution(* com.company.xyz.method())", throwing="exception") 
public Object invoke(JoinPoint jp, ThrottlingException exception) throws Throwable { 
    return RetryingCallable.newRetryingCallable(new Callable<Object>() { 

     @Override 
     public Object call() throws Exception { 
       MethodSignature signature = (MethodSignature) p.getSignature(); 
       Method method = signature.getMethod(); 
       return method.invoke(jp.getThis(), (Object[]) null); 
     } 

    }, retryPolicy).call(); 
} 

這裏RetryingCallable是有人寫了一個簡單的實現(內部庫在我的公司),並採用RetryAdvice

我的相關彈簧的配置如下:這裏

<bean id="retryInterceptor" class="com.company.xyz.RetryInterceptor"> 
    <constructor-arg index="0"><ref bean="retryPolicy"/></constructor-arg> 
</bean> 


<bean id="throttlingInterceptor" class="com.company.xyz.ThrottlingInterceptor"> 
    <constructor-arg><value>throttleKey</value></constructor-arg> 
</bean> 
<context:component-scan base-package="com.company.xyz"> 
    <context:include-filter type="annotation" expression="org.aspectj.lang.annotation.Aspect"/> 
</context:component-scan> 
<aop:aspectj-autoproxy/> 

的問題,因爲我看到的是,在每個ThrottlingExceptionRetry Advice被應用,而不是以前的一個是進入影響。

有關如何解決此問題的任何輸入?

回答

2

免責聲明: Spring的用戶,因此,我要在這裏提出一個純粹的AspectJ的解決方案。但它應該在Spring AOP中以相同的方式工作。您需要更改的唯一方法是從@DeclarePresedence切換到@Order以獲取寬高比配置,如Spring AOP manual中所述。

驅動程序:

package de.scrum_master.app; 

public class Application { 
    public static void main(String[] args) { 
     new Application().doSomething(); 
    } 

    public void doSomething() { 
     System.out.println("Doing something"); 
    } 
} 

節流異常類:

package de.scrum_master.app; 

public class ThrottlingException extends RuntimeException { 
    private static final long serialVersionUID = 1L; 

    public ThrottlingException(String arg0) { 
     super(arg0); 
    } 
} 

節流攔截:

爲了模擬節流的情況我創建了一個輔助方法isThrottled(),其中3例隨機返回true

package de.scrum_master.aspect; 

import java.util.Random; 

import org.aspectj.lang.JoinPoint; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.Before; 

import de.scrum_master.app.ThrottlingException; 

@Aspect 
public class ThrottlingInterceptor { 
    private static final Random RANDOM = new Random(); 

    @Before("execution(* doSomething())") 
    public void invoke(JoinPoint thisJoinPoint) throws ThrottlingException { 
     System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint); 
     if (isThrottled()) { 
      throw new ThrottlingException("call throttled"); 
     } 
    } 

    private boolean isThrottled() { 
     return RANDOM.nextInt(3) > 0; 
    } 
} 

重試攔截:

請注意,AspectJ的註解@DeclarePrecedence("RetryInterceptor, *")說這個攔截器是之前任何其他的執行。請用兩個攔截器類上的@Order註釋替換它。否則,@Around建議無法捕獲限制攔截器引發的異常。

另外值得一提的是,這個攔截器不需要任何反射來實現重試邏輯,它直接在重試循環中使用連接點,以便重試thisJoinPoint.proceed()。這可以很容易地被分解成實施不同類型的重試行爲的幫助方法或助手類。只要確保使用ProceedingJoinPoint作爲參數而不是Callable

package de.scrum_master.aspect; 

import org.aspectj.lang.ProceedingJoinPoint; 
import org.aspectj.lang.annotation.Around; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.DeclarePrecedence; 

import de.scrum_master.app.ThrottlingException; 

@Aspect 
@DeclarePrecedence("RetryInterceptor, *") 
public class RetryInterceptor { 
    private static int MAX_TRIES = 5; 
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000; 

    @Around("execution(* doSomething())") 
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable { 
     System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint); 
     ThrottlingException throttlingException = null; 
     for (int i = 1; i <= MAX_TRIES; i++) { 
      try { 
       return thisJoinPoint.proceed(); 
      } 
      catch (ThrottlingException e) { 
       throttlingException = e; 
       System.out.println(" Throttled during try #" + i); 
       Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES); 
      } 
     } 
     throw throttlingException; 
    } 
} 

控制檯登錄成功重試:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #1 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #2 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
Doing something 

控制檯登錄失敗重試:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #1 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #2 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #3 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #4 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #5 
Exception in thread "main" de.scrum_master.app.ThrottlingException: call throttled 
    at de.scrum_master.aspect.ThrottlingInterceptor.invoke(ThrottlingInterceptor.aj:19) 
    at de.scrum_master.app.Application.doSomething_aroundBody0(Application.java:9) 
    at de.scrum_master.app.Application.doSomething_aroundBody1$advice(Application.java:22) 
    at de.scrum_master.app.Application.doSomething(Application.java:1) 
    at de.scrum_master.app.Application.main(Application.java:5) 

隨意問有關我的回答任何後續問題。


更新:我不知道你是如何和RetryingCallableRetryPolicy /接口工作,你沒有告訴我很多關於它。但是我做了什麼,得到它的工作是這樣的:

package de.scrum_master.app; 

import java.util.concurrent.Callable; 

public interface RetryPolicy<V> { 
    V apply(Callable<V> callable) throws Exception; 
} 
package de.scrum_master.app; 

import java.util.concurrent.Callable; 

public class DefaultRetryPolicy<V> implements RetryPolicy<V> { 
    private static int MAX_TRIES = 5; 
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000; 

    @Override 
    public V apply(Callable<V> callable) throws Exception { 
     Exception throttlingException = null; 
     for (int i = 1; i <= MAX_TRIES; i++) { 
      try { 
       return callable.call(); 
      } 
      catch (ThrottlingException e) { 
       throttlingException = e; 
       System.out.println(" Throttled during try #" + i); 
       Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES); 
      } 
     } 
     throw throttlingException; 
    } 
} 
package de.scrum_master.app; 

import java.util.concurrent.Callable; 

public class RetryingCallable<V> { 
    private RetryPolicy<V> retryPolicy; 
    private Callable<V> callable; 

    public RetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) { 
     this.callable = callable; 
     this.retryPolicy = retryPolicy; 
    } 

    public static <V> RetryingCallable<V> newRetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) { 
     return new RetryingCallable<V>(callable, retryPolicy); 
    } 

    public V call() throws Exception { 
     return retryPolicy.apply(callable); 
    } 
} 

現在改變重試攔截這樣的:

package de.scrum_master.aspect; 

import java.util.concurrent.Callable; 

import org.aspectj.lang.ProceedingJoinPoint; 
import org.aspectj.lang.annotation.Around; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.DeclarePrecedence; 

import de.scrum_master.app.DefaultRetryPolicy; 
import de.scrum_master.app.RetryPolicy; 
import de.scrum_master.app.RetryingCallable; 

@Aspect 
@DeclarePrecedence("RetryInterceptor, *") 
public class RetryInterceptor { 
    private RetryPolicy<Object> retryPolicy = new DefaultRetryPolicy<>(); 

    @Around("execution(* doSomething())") 
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable { 
     System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint); 
     return RetryingCallable.newRetryingCallable(
      new Callable<Object>() { 
       @Override 
       public Object call() throws Exception { 
        return thisJoinPoint.proceed(); 
       } 
      }, 
      retryPolicy 
     ).call(); 
    } 
} 

日誌輸出將是非常類似於你之前看到過。對我來說這很好。

+0

感謝您的回答。我已經在使用'@ Ordered'。在這裏有一個問題,因爲我使用'@ ThrowsException'的建議,我有種無限循環的感覺。但是你的代碼似乎可以用'@ Around'正常工作,我在這裏失蹤的細微差別是什麼? – AgentX

+0

也許你的意思是'@ AfterThrowing'。那麼,正如名字所暗示的那樣,這個通知類型會在方法執行完成後攔截它們,'@ Around'包裝自己_方法執行,即可以在執行之前和之後執行某些操作,捕獲並處理異常,重試方法執行等。 '@ AfterThrowing'已經太遲了,拋出異常並且方法執行結束。此外,請查看我的日誌中的異常callstack:異常在'ThrottlingInterceptor.invoke'中引發,因此您可能錯誤地使用了'@ AfterThrowing'切入點。 – kriegaex

+0

謝謝,我現在添加了一個循環並重新嘗試,正如你所解釋的那樣。最後還有一件事:在將來我想要使用一個通用的重試策略(它提供了良好的默認值,例如'3次重試','隨機性'和'指數退避'),正如我在上面的例子中所使用的('return RetryingCallable .newRetryingCallable(new Callable ()')有關如何添加它的任何輸入,而不是自定義循環? – AgentX