2016-02-15 58 views
2

Akka 2.4.1 Java API here。我現在沒有帶寬學習Scala,所以我會問這裏的代碼示例也使用Java API。如何使阻塞/同步調用進入Akka actor系統?

我有一個現有的ActorSystem,它充滿了異步的演員,並且工作得很漂亮。我現在有一個需要重用這個演員系統在同步情況下,像這樣:

// Groovy pseudo-code; NOT inside the actor system here! 
ComputationRequest request = new ComputationRequest(1, 7, true) 
MasterActor master = actorSystem.get(...) 
ComputationResult result = actorSystem.tell(master, request) 

無處阿卡的文檔中我會看到發送equests到演員系統(來自外部),然後檢索的一個明顯的例子結果。我可以在這裏使用Futures嗎?在Akka(代碼示例)中處理這種模式的標準方式是什麼?

回答

9

ask模式做你想要的。它期望目標行動者通過tell將其回覆到getSender()。你會得到一個Future的迴應,並可以與該(阻止,如果你必須的話)。

import akka.dispatch.*; 
import scala.concurrent.ExecutionContext; 
import scala.concurrent.Future; 
import scala.concurrent.Await; 
import scala.concurrent.Promise; 
import akka.util.Timeout; 


ComputationRequest request = new ComputationRequest(1, 7, true); 
MasterActor master = actorSystem.get(...); 

Timeout timeout = new Timeout(Duration.create(5, "seconds")); 
Future<Object> future = Patterns.ask(master, request, timeout); 
ComputationResult result = (ComputationResult) Await.result(future, timeout.duration()); 
0

你可以注入回調,裹成一個演員,到阿卡系統,並使用它作爲「發件人」與力學告訴。完整示例:

import akka.actor.*; 
import akka.dispatch.Await; 
import akka.dispatch.Future; 
import akka.pattern.Patterns; 
import akka.util.Timeout; 

import java.util.concurrent.TimeUnit; 
import java.util.function.Consumer; 

public class Main { 

    public static void main(String[] args) throws Exception { 

     // init your akka system 
     final ActorSystem system = ActorSystem.create("Cambria"); 
     final ActorRef worker = system.actorOf(new Props(Worker.class), "worker"); 
     worker.tell(new Work(10)); 

     // make callback 
     final Consumer<Object> callback = a -> System.out.println("from the other side: " + a); 

     // wrap call back into sub-actor 
     class TheSpy extends UntypedActor { 
      @Override 
      public void onReceive(final Object message) throws Exception { 
       callback.accept(message); 
      } 
     } 

     // inject callback into the system 
     final ActorRef theSpy = system.actorOf(new Props(TheSpy::new), "spy"); 

     // find actor you want to hack 
     final ActorSelection found = system.actorSelection("/user/worker"); 
     // send it a message and observe using callback) 
     found.tell(new Work(20), theSpy); 

     final Timeout timeout = new Timeout(5, TimeUnit.SECONDS); 
     final Future<Object> future = Patterns.ask(worker, new Work(30), timeout); 
     final Work result = (Work) Await.result(future, timeout.duration()); 
     System.out.println(result); 

     system.shutdown(); 
     system.awaitTermination(); 
    } 
} 

public class Worker extends UntypedActor { 
    public void onReceive(Object message) { 
     if (message instanceof Work) { 
      Work work = (Work) message; 
      System.out.println("work = " + work); 
      getSender().tell(new Work(work.getStart() + 1)); 
     } else { 
      unhandled(message); 
     } 
    } 
} 

public class Work { 
    private final int start; 

    public Work(int start) { 
     this.start = start; 
    } 

    public int getStart() { 
     return start; 
    } 

    @Override 
    public String toString() { 
     return "Work{" + 
       "start=" + start + 
       '}'; 
    } 
}