1
我很好奇如何使用RxJava在Akka中實現盡力而爲的重試,而不使用持久Actor。這個想法是使用Rx的retry
方法繼續詢問,直到從目標演員收到響應。使用RxJava的Akka「盡力而爲」
其他例子很難找到。是否有阿卡大師可以驗證這個實現,或者指出我有更好的解決方案?
實施例:
public class RxWithAkka {
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
public static final Timeout TIMEOUT = Timeout.apply(10, TimeUnit.MILLISECONDS);
private final ActorRef actor;
private final ActorSystem actorSystem;
public RxWithAkka(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
this.actor = actorSystem.actorOf(Props.create(MyActor.class));
}
public Observable<Object> ping() {
return createObservable()
.doOnError(t -> LOGGER.warn(t.getMessage()))
.retry();
}
Observable<Object> createObservable() {
return Observable.create(subscriber -> {
LOGGER.info("Send ping");
Patterns.ask(actor, "ping", TIMEOUT)
.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object success) throws Throwable {
if (success != null) {
subscriber.onNext(success);
subscriber.onCompleted();
} else {
subscriber.onError(failure);
}
}
}, actorSystem.dispatcher());
});
}
}
例演員證明消息沒有接收和超時:
public class MyActor extends UntypedActor {
private int counter = 0;
@Override
public void onReceive(Object message) throws Exception {
switch (counter++) {
case 0:
// ignore message
break;
case 1:
// timeout
Thread.sleep(200);
break;
default:
getSender().tell("pong", getSelf());
}
}
}
}
測試:
public class RxWithAkkaTest {
@Test
public void testIt() throws Exception {
ActorSystem system = ActorSystem.create("system");
RxWithAkka example = new RxWithAkka(system);
String res = (String) example.ping().toBlocking().first();
assertThat(res).isEqualTo("pong");
}
}
也許很明顯,但我會說無論如何:沒有堅持它不會至少一次,它會是「盡力而爲」重試。不是說這很糟糕,你的用例可能完全正確,只是試圖保持術語的正確性。 –