2016-09-13 25 views
8

我有以下要求阿卡模型計算任務

  • 連接到同一個用戶名和密碼,網絡服務器,並得到一個authetication令牌
  • 讀文件,以獲得不同的參數
  • 使用身份驗證令牌來回步驟1和步驟2的參數發送http請求到網絡服務器

現在我有一個演員執行所有上述任務,如下所示

package akka.first.java; 

import akka.actor.UntypedActor; 

public class MySingleActor extends UntypedActor { 

    public void onReceive(Object msg) { 

     if(msg instanceof sendRequest) { 

      //Connect to a webserver with a username and password and get an authetication token 
      String token = getToken(); 
      // Read file to get different parameters 
      Param param = readFile(); 
      // Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server 
      Response response = sendRequest (server, token, param); 


     } 

    } 

    private Param readFile() { 
     // reads file 
    } 

    private String getToken() { 
     //gets token 
    } 
} 

readFile操作包含各種子任務,我認爲它應該是一個單獨的actor。 但是由於參與者需要從readFile()操作返回來執行發送請求的主要任務,所以這可能會阻止根據文檔不推薦這麼做,那麼最好的方法是什麼?期貨?

回答

2

您可以使用期貨,也可以使用RxJava和Observable and Observers。 或不同的演員和轉發給orginial發件人

public class MySingleActor extends UntypedActor{ 

private ActorRef tokenActor; 
private ActorRef readFileActor; 

public MySingleActor(){ 
    tokenActor = context().actorOf(Props.create(TokenActor.class),"tokenActor"); 
    readFileActor = context().actorOf(Props.create(ReadFileActor.class),"readFileActor"); 
} 
public void onReceive(Object msg) { 
    if(msg instanceof sendRequest) { 
     Future<String> f= Futures.future(new Callable<String>() { 
      @Override public String call() throws Exception { 
       return getToken(); 
      }   },context().dispatcher());Patterns.pipe(f,context().dispatcher()).to(tokenActor).pipeTo(readFileActor,self()); 
    }  
}} 

的最終響應或代替管

f.onComplete(new OnComplete<String>(){ public void onComplete(Throwable t, String result){ readFileActor.tell(result,self()); } }, context().system().dispatcher());

+0

你是指阿卡期貨?如果我轉發響應,它會阻止,直到收到響應? –

+0

在readfile參與者,使用scala期貨,並onsuccess轉發迴應給發件人 – gaston

+0

我仍然不清楚,你可以添加更多的上下文來看代碼的樣子?未來將是演員從演員系統外部獲取的數據的佔位符正確,而這個未來將演變爲'MySingleActor演員作爲轉發的味精? –

5

Official documentation提供以下解決方案:

  • 執行阻塞調用在一個角色(或由路由器[Java,Scala]管理的一組角色)內,確保配置專用於此目的或足夠大小的線程池。
  • 在Future中執行阻塞調用,確保在任何時間點上調用此類調用數量的上限(提交無限數量的此類任務將耗盡您的內存或線程限制)。
  • 在Future中執行阻塞調用,爲線程池提供適用於運行該應用程序的硬件的線程數上限。
  • 專用單個線程來管理一組阻塞資源(例如,驅動多個通道的NIO選擇器)並在事件作爲參與者消息發生時分派事件。

使用期貨是官方建議的方法之一,但要格外小心。

讓我們考慮第一種方法,因爲IMO更一致。

首先將所有阻塞IO操作提取到僅執行一個阻塞IO操作的新參與者中。假設只有一個爲了簡潔這樣的操作:

public class MyBlockingIOActor extends UntypedActor { 
    public void onReceive(Object msg) { 
     // do blocking IO call here and send the result back to sender 
    } 
} 

添加configuration for dispatcher,那會照顧阻塞演員,在actor system configuration file(通常application.conf):

#Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO 
blocking-io-dispatcher { 
    type = Dispatcher 
    executor = "thread-pool-executor" 
    thread-pool-executor { 
    fixed-pool-size = 32 
    } 
    throughput = 1 
} 

請確保您使用創建演員系統時的配置文件(特別是如果您決定使用非標準文件名進行配置):

ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf")); 

之後,您要將執行阻止IO的actor分配給專用調度程序。

ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")); 

爲了獲得更大的吞吐量,可以考慮包裝阻隔演員入池:

SupervisorStrategy strategy = new OneForOneStrategy(
     5, 
     Duration.create(1, TimeUnit.MINUTES), 
     Collections.singletonList(Exception.class) 
); 
ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"))); 

可以確保演員可以描述here或創建的演員時做在配置按以下方式使用合適的調度員:

public class MyBlockingIOActor extends UntypedActor { 
    public void preStart() { 
     LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id()); 
    } 
}