2015-04-22 51 views
1

我很好奇如何使用RxJava在Akka中實現盡力而爲的重試,而不使用持久Actor。這個想法是使用Rx的retry方法繼續詢問,直到從目標演員收到響應。使用RxJava的Akka「盡力而爲」

其他例子很難找到。是否有阿卡大師可以驗證這個實現,或者指出我有更好的解決方案?

實施例:

public class RxWithAkka { 

    private final Logger LOGGER = LoggerFactory.getLogger(getClass()); 

    public static final Timeout TIMEOUT = Timeout.apply(10, TimeUnit.MILLISECONDS); 
    private final ActorRef actor; 
    private final ActorSystem actorSystem; 

    public RxWithAkka(ActorSystem actorSystem) { 
     this.actorSystem = actorSystem; 
     this.actor = actorSystem.actorOf(Props.create(MyActor.class)); 
    } 

    public Observable<Object> ping() { 
     return createObservable() 
       .doOnError(t -> LOGGER.warn(t.getMessage())) 
       .retry(); 
    } 

    Observable<Object> createObservable() { 
     return Observable.create(subscriber -> { 
      LOGGER.info("Send ping"); 
      Patterns.ask(actor, "ping", TIMEOUT) 
        .onComplete(new OnComplete<Object>() { 
         @Override 
         public void onComplete(Throwable failure, Object success) throws Throwable { 
          if (success != null) { 
           subscriber.onNext(success); 
           subscriber.onCompleted(); 
          } else { 
           subscriber.onError(failure); 
          } 
         } 
        }, actorSystem.dispatcher()); 
     }); 
    } 
} 

例演員證明消息沒有接收和超時:

public class MyActor extends UntypedActor { 

     private int counter = 0; 

     @Override 
     public void onReceive(Object message) throws Exception { 
      switch (counter++) { 
       case 0: 
        // ignore message 
        break; 
       case 1: 
        // timeout 
        Thread.sleep(200); 
        break; 
       default: 
        getSender().tell("pong", getSelf()); 
      } 
     } 
    } 
} 

測試:

public class RxWithAkkaTest { 

    @Test 
    public void testIt() throws Exception { 
     ActorSystem system = ActorSystem.create("system"); 
     RxWithAkka example = new RxWithAkka(system); 
     String res = (String) example.ping().toBlocking().first(); 
     assertThat(res).isEqualTo("pong"); 
    } 
} 
+0

也許很明顯,但我會說無論如何:沒有堅持它不會至少一次,它會是「盡力而爲」重試。不是說這很糟糕,你的用例可能完全正確,只是試圖保持術語的正確性。 –

回答

1

在RxJava,可以使用timeout操作者結合與retry