2

我有一個遠程演員,Bar和本地演員Foo。我希望使用Foo在每次調用CLI時將消息傳遞給Bar如何使用Akka Remoting通過CLI將消息發送給遠程演員?

Bar可以成功傳遞消息,但Foo在等待消息時掛起。爲了解決這個問題,我在Foo的最後添加了一個sys.exit(0)。這會導致與Foo的系統有關聯問題。

如何在連續CLI發行之間關閉本地演員而不手動殺死本地演員?

閉嘴,給我代碼!


的Foo:

build.sbt

name := "Foo" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11" 
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11" 
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0" 

fork in run := true 

Main.scala
import akka.actor._ 
import com.typesafe.config.ConfigFactory 

case class Config(mode: String = "", greeting: String="") 

class Foo extends Actor { 
    // create the remote actor 
    val BarActor = context.actorSelection("akka.tcp://[email protected]:2552/user/BarActor") 

    def receive = { 
    case method: String => BarActor ! method 
    } 
} 

object CommandLineInterface { 

    val config = ConfigFactory.load() 
    val system = ActorSystem("FooSystem", config.getConfig("FooApp")) 

    val FooActor = system.actorOf(Props[Foo], name = "FooActor") 

    val parser = new scopt.OptionParser[Config]("Foo") { 
    head("foo", "1.x") 

    help("help").text("prints usage text") 

    opt[String]('m', "method").action((x, c) => 
     c.copy(greeting = x)).text("Bar will greet with <method>") 
    } 
} 

object Main extends App { 
    import CommandLineInterface.{parser, FooActor} 

    parser.parse(args, Config()) match { 
    case Some(config) => FooActor ! config.greeting 
    case None => sys.error("Bad news...") 
    } 
    /* 
    When sys.exit(0) commented, this hangs and Bar greet. 
    When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet. 
    */ 

    //sys.exit(0) 
} 

application.conf

FooApp { 
    akka { 
    loglevel = "INFO" 
    actor { 
     provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
     enabled-transports = ["akka.remote.netty.tcp"] 
     netty.tcp { 
     hostname = "127.0.0.1" 
     port = 0 
     } 
     log-sent-messages = on 
     log-received-messages = on 
    } 
    } 
} 

酒吧:

build.sbt

name := "Bar" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11" 
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11" 

Main.scala

import akka.actor._ 
import com.typesafe.config.ConfigFactory 

class Bar extends Actor { 
    def receive = { 
    case greeting: String => Bar.greet(greeting) 
    } 
} 

object Bar { 
    val config = ConfigFactory.load() 
    val system = ActorSystem("BarSystem", config.getConfig("BarApp")) 
    val BarActor = system.actorOf(Props[Bar], name = "BarActor") 

    def greet(greeting: String) = println(greeting) 

    def main(args: Array[String]): Unit = { 
    /* Intentionally empty */ 
    } 
} 

application.conf

BarApp { 
    akka { 
    loglevel = "INFO" 
    actor { 
     provider = remote 
    } 
    remote { 
     enabled-transports = ["akka.remote.netty.tcp"] 
     netty.tcp { 
     hostname = "127.0.0.1" 
     port = 2552 
     } 
     log-sent-messages = on 
     log-received-messages = on 
    } 
    } 
} 

sbt 'run-main Main -m hello'運行Foo,並與sbt 'run-main Main'運行Bar

對不起,長的代碼,但它是我的問題MVCE。

我該如何實現自己想要的行爲 - CLI角色會在連續的CLI調用與遠程參與者等待新消息之間死亡。

+1

爲什麼你認爲'Bar'死了?日誌中是否有指示這一點的內容? –

+0

@PawełBartkiewicz我試圖澄清我的意思。對不起,這個錯誤。 :)希望它更清楚。 – erip

回答

3

發生這種情況,因爲你將消息發送到FooActor後立即撥打sys.exit(0),所以有顯著的機會,FooActor之前退出應用程序獲取到甚至閱讀信息的機會,更不用說將其轉發給BarActor

似乎有成爲many possible solutions,其中之一是:

class Foo extends Actor { 
    // create the remote actor 
    val BarActor = context.actorSelection("akka.tcp://[email protected]:2552/user/BarActor") 

    override def receive = { 
    case method: String => { 
     BarActor ! method 
     self ! PoisonPill 
    } 
    } 

    override def postStop = { 
    context.system.terminate 
    } 
} 

不幸的是,事實證明,該系統仍然被分派消息Bar之前關閉。

如果您想以「消防和忘記」的風格發送消息,我無法找到任何合理的解決方案。然而,在大多數情況下,它需要獲得某種從遠程演員的反應,所以你可以這樣做:

class Foo extends Actor { 
    // create the remote actor 
    val BarActor = context.actorSelection("akka.tcp://[email protected]:2552/user/BarActor") 

    override def receive = { 
    case method: String => { 
     BarActor ! method 
     context.become(waitingToKillMyself) 
    } 
    } 

    def waitingToKillMyself: Receive = { 
    case response: String => { 
     println(response) 
     self ! PoisonPill 
    } 
    } 

    override def postStop = { 
    context.system.terminate 
    } 
} 

// ... 

object Main extends App { 
    import CommandLineInterface.{parser, FooActor, system} 
    import system.dispatcher 

    parser.parse(args, Config()) match { 
    case Some(config) => { 
     FooActor ! config.greeting 
     system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill) 
    } 

    case None => sys.error("Bad news...") 
    } 
} 

酒吧:

class Bar extends Actor { 
    def receive = { 
    case greeting: String => { 
     Bar.greet(greeting) 
     sender() ! "OK" 
    } 
    } 
} 
+0

這似乎是一個固定的解決方案,如果它是一次性消息,但我希望'酒吧'基本上總是等待消息。這會達到這個目的嗎? – erip

+1

BarSystem不是一個單獨的演員系統嗎? –

+0

對不起,編輯後沒有看到您的評論。是的,我相信這應該可以正常工作(現在不能自己嘗試,如果你想的話,我可以稍後再做)。由於'FooSystem'和'BarSystem'是獨立的actor系統,'shutdown'或'terminate'只能停止'FooSystem'([documentation](http://doc.akka.io/api/akka/2.4/index.html #[email protected]():scala.concurrent.Future [akka.actor.Terminated]))。 –

相關問題