2017-05-05 47 views
0

我正在使用Akka(2.5.1)在Java(8)Spring Boot(1.5.2.RELEASE)應用程序中使用反應模式。這很好,但現在我被困在試圖從actor中運行CompletableFuture。爲了模擬這個,我創建了一個返回CompletableFuture的非常簡單的服務。但是,當我然後嘗試將結果返回給調用控制器時,我收到有關死信的錯誤,並且沒有返回響應。從Actor中運行CompletableFuture

我正的錯誤是:

[INFO] [2017年5月5日13:12:25.650] [阿卡 - 彈簧 - 演示akka.actor.default-調度-5] [ akka:// akka-spring-demo/deadLetters] Actor [akka:// akka-spring-demo/user/$ a#-1561144664]的消息[java.lang.String]給Actor [akka:// akka-spring -demo/deadLetters]未送達。 [1]遇到了死信。可以關閉此日誌記錄,也可以使用配置設置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'來調整日誌記錄。

這是我的代碼。這是調用演員控制器:

@Component 
@Produces(MediaType.TEXT_PLAIN) 
@Path("/") 
public class AsyncController { 
    @Autowired 
    private ActorSystem system; 

    private ActorRef getGreetingActorRef() { 
     ActorRef greeter = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system) 
        .props("greetingActor")); 

     return greeter; 
    } 

    @GET 
    @Path("/foo") 
    public void test(@Suspended AsyncResponse asyncResponse, @QueryParam("echo") String echo) { 
     ask(getGreetingActorRef(), new Greet(echo), 1000) 
      .thenApply((greet) -> asyncResponse.resume(Response.ok(greet).build())); 
    } 
} 

這裏的服務:

@Component 
public class GreetingService { 
    public CompletableFuture<String> greetAsync(String name) { 
     return CompletableFuture.supplyAsync(() -> "Hello, " + name); 
    } 
} 

那麼這裏就是演員接到電話。起初,我有這樣的:

@Component 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class GreetingActor extends AbstractActor { 
    @Autowired 
    private GreetingService greetingService; 

    @Autowired 
    private ActorSystem system; 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
       .match(Greet.class, this::onGreet) 
       .build(); 
    } 

    private void onGreet(Greet greet) { 
     greetingService.greetAsync(greet.getMessage()) 
      .thenAccept((greetingResponse) -> getSender().tell(greetingResponse, getSelf())); 
    } 

} 

這導致2個調用正確處理,但之後,我會得到死信錯誤。然後我讀到這裏什麼可能導致我的問題: http://doc.akka.io/docs/akka/2.5.1/java/actors.html

警告 當使用未來的回調,你需要仔細避免關閉在包含演員的參考裏面的演員,即不調用方法或訪問可變狀態上來自回調中的封閉演員。這會破壞actor封裝並可能引入同步錯誤和競爭條件,因爲回調將同時安排給封閉的actor。不幸的是,在編譯時還沒有辦法檢測這些非法​​訪問。參見:參與者和共享的可變狀態

所以我想到的是,你將結果傳遞給self()之後,你可以做getSender()。tell(response,getSelf())。

所以我改變了我的代碼如下:

@Component 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class GreetingActor extends AbstractActor { 
    @Autowired 
    private GreetingService greetingService; 

    @Autowired 
    private ActorSystem system; 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
       .match(Greet.class, this::onGreet) 
       .match(String.class, this::onGreetingCompleted) 
       .build(); 
    } 

    private void onGreet(Greet greet) { 
     pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSelf()); 
    } 

    private void onGreetingCompleted(String greetingResponse) { 
     getSender().tell(greetingResponse, getSelf()); 
    } 

} 

的onGreetingCompleted方法被調用從GreetingService的響應,但在那個時候,我再次得到了死字母錯誤,以便出於某種原因它可以」將響應發回給呼叫控制器。

需要注意的是,如果我改變服務這樣的:在演員

@Component 
public class GreetingService { 
    public String greet(String name) { 
     return "Hello, " + name; 
    } 
} 

而且onGreet到:

private void onGreet(Greet greet) { 
    getSender().tell(greetingService.greet(greet.getMessage()), getSelf()); 
} 

然後一切工作正常。所以看起來我的基本Java/Spring/Akka正確設置了,它只是在我的演員試圖調用一個CompletableFuture來解決問題的時候。

任何幫助將不勝感激,謝謝!

+0

deadLetters意味着要麼發送消息給無效的(過時的)引用。有了這個問題,如果在響應之前詢問超時(通過實際創建臨時演員請求工作),就會發生這種情況。也許發佈這個錯誤會有所幫助。儘管我必須承認我沒有很多想法。 –

+0

我編輯了我的帖子以顯示我收到的錯誤。我意識到死信的含義以及如何獲得它們。我只是不知道爲什麼我在這個特殊情況下得到他們。 –

+0

謝謝,我需要看看什麼樣的消息類型是去哪裏,誰是實際的預期收件人。希望答案解釋你的疑惑。 –

回答

1

getSender方法只能在消息的同步處理期間可靠地返回發件人的引用。

在你的第一種情況下,你必須:

greetingService.greetAsync(greet.getMessage()) 
     .thenAccept((greetingResponse) -> getSender().tell(greetingResponse, getSelf())); 

這意味着getSender()被調用一次異步未來完成。不再可靠。您可以更改到:

ActorRef sender = getSender(); 
greetingService.greetAsync(greet.getMessage()) 
     .thenAccept((greetingResponse) -> sender.tell(greetingResponse, getSelf())); 

在你的第二個例子,你有

pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSelf()); 

您管道將響應 「getSelf()」,即你的工人演員。原始發件人將永遠不會收到任何內容(因此問題會過期)。您可以修復到:

pipe(greetingService.greetAsync(greet.getMessage()), system.dispatcher()).to(getSender()); 

在第三種情況下,你有getSender()被該消息的處理過程中同步執行的,因此它的工作原理。

+0

謝謝Diego!這確實有效。週末愉快! –

相關問題