2015-06-17 61 views
2

繼Akka集羣文檔後,我運行了「工作人員撥入」示例。組合噴射路由+演員模式匹配

http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html

所以我試圖集成了用噴霧路由。

我的想法是在幕後創建一個羣集,並通過http休息來調用該服務。

所以我有以下代碼。

object Boot extends App { 

    val port = if (args.isEmpty) "0" else args(0) 
    val config = 
    ConfigFactory 
     .parseString(s"akka.remote.netty.tcp.port=$port") 
     .withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")) 
     .withFallback(ConfigFactory.load()) 

    val system = ActorSystem("ClusterSystem", config) 
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") 
    implicit val actSystem = ActorSystem() 

    IO(Http) ! Http.Bind(frontend, interface = config.getString("http.interface"), port = config.getInt("http.port")) 
} 

class TransformationFrontend extends Actor { 

    var backends = IndexedSeq.empty[ActorRef] 
    var jobCounter = 0 
    implicit val timeout = Timeout(5 seconds) 

    override def receive: Receive = { 

    case _: Http.Connected => sender ! Http.Register(self) 

    case HttpRequest(GET, Uri.Path("/job"), _, _, _) => 

     jobCounter += 1 
     val backend = backends(jobCounter % backends.size) 

     val originalSender = sender() 

     val future : Future[TransformationResult] = (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult] 
     future onComplete { 
     case Success(s) => 
      println("received from backend: " + s.text) 
      originalSender ! s.text 
     case Failure(f) => println("error found: " + f.getMessage) 
     } 

    case job: TransformationJob if backends.isEmpty => 
     sender() ! JobFailed("Service unavailable, try again later", job) 

    case job: TransformationJob => 
     jobCounter += 1 
     backends(jobCounter % backends.size) forward job 

    case BackendRegistration if !backends.contains(sender()) => 
     println("backend registered") 
     context watch sender() 
     backends = backends :+ sender() 

    case Terminated(a) => 
     backends = backends.filterNot(_ == a) 
    } 
} 

但我真正想要做的是將噴霧路由與這些模式匹配相結合。

而不是寫我得到一個像上述情況,我想這樣寫:

path("job") { 
    get { 
    respondWithMediaType(`application/json`) { 
     complete { 
     (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult] 
     } 
    } 
    } 
} 

但與此類延長我的演員,我要做以下

def receive = runRoute(defaultRoute) 

如何我可以將此方法與TransformationFrontend Actor模式匹配方法結合使用嗎?後端註冊,終止,TransformationJob?

+1

如何將您的Spray請求轉發給具有模式匹配邏輯的另一個actor。如果你可以在消息中傳遞requestContext,那麼你可以從那裏完成HTTP請求。 –

回答

3

您可以撰寫PartialFunction就像ReceivePartialFunction.orElse

class TransformationFrontend extends Actor { 
    // ... 
    def myReceive: Receive = { 
    case job: TransformationJob => // ... 
    // ... 
    } 
    def defaultRoute: Route = 
    get { 
     // ... 
    } 
    override def receive: Receive = runRoute(defaultRoute) orElse myReceive 
} 

這就是說,它通常是有意義的,如果可能的功能分成幾個演員(如評論所說以上)。

+0

太棒了,我非常專注於將兩者混合在一起,我永遠不會想到將接收方法分開。謝謝。 –