2012-07-10 67 views
1

我有一個應用程序,使用akka,現在我想通過套接字連接來連接它。因此,我使用類似於the one from the scala page的機制。 但是,如果我嘗試tell,而我打開OutputStream,目標沒有收到任何消息。套接字連接和ActorSystem

這裏是我的源代碼:

object Connector { 

    def main(args: Array[String]) { 
    val port = 1337 
    val conf = ConfigFactory.load 
    val system = ActorSystem("SDDB", conf.getConfig("SDDB")) 
    val master = system.actorOf(Props[TestActor]) 
    master ! "a" 

    try { 
     val listener = new ServerSocket(port) 
     println("listening on port: " + port) 
     while (true) 
     new ConnectionThread(listener accept, master).start 
     listener close 
    } catch { 
     case e: IOException => 
     System.err.println("Could not listen on port: " + port + ".") 
     System.exit(-1) 
    } finally { 
     system.shutdown 
    } 
    } 
} 

case class ConnectionThread(socket: Socket, master: ActorRef) 
    extends Thread("ConnectionThread") { 

    private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r 
    private implicit var id = 0L 
    private implicit val timeout = Timeout(25.0 seconds) 

    master ! "b" 

    override def run { 
    master ! "c" 
    try{ 
     master ! "d" 
     val in = new ObjectInputStream(socket getInputStream) 
     master ! "e" 
     val out = new ObjectOutputStream(socket getOutputStream) 

     out writeObject("listening") 
     out flush 

     master ! "f" 
     val command = in.readObject.asInstanceOf[String] 
     println("client sent: '" + command + "'") 
     // process the command 

     master ! "g" 
     out.writeObject("EOF") 
     out.flush 

     out.close 
     in.close 
     socket.close 
    } catch { 
     case e: SocketException => 
     case e: IOException => e printStackTrace 
    } 
    } 
} 

class TestActor extends Actor with ActorLogging{ 

    log info("TestActor running") 

    def receive = { 
    case s: String => 
     log info("received: " + s) 
    } 

} 

我得到的輸出:

listening on port: 1337 
[INFO] TestActor running 
[INFO] received: a 
[INFO] received: b 
[INFO] received: c 
[INFO] received: d 

現在,我希望它繼續下去,直到克,而是我得到:

client sent: 'select content from testdata on 2012-07-06' 

我想通了,直到我打開一個插座流,可能是因爲tellask也是基於套接字的,並且使用套接字的輸出流,踏入運行。之後,套接字連接起作用,但是我不能將任何消息發送給參與者系統。
有沒有辦法讓我放棄連接器和ConnectionThread。我該如何解決它?

+0

http://doc.akka.io/docs/akka/2.0.2/scala/io.html – 2012-07-10 12:24:27

+0

感謝您的快速回答。不幸的是,我無法從文檔中獲得我代碼中的更改。你能給我一個提示如何改變它嗎? – 2012-07-11 07:23:27

+0

我可以回答特定問題,但我沒有時間爲您編寫解決方案。 – 2012-07-11 10:39:12

回答

0

我必須承認,我並沒有完全理解文檔中的例子。但我發現使用ConnectionHelper而不是直接處理ActorRef的工作非常好。
我改變了我的代碼如下:

object Connector { 

    def main(args: Array[String]) { 
    val port = 1337 
    val conf = ConfigFactory.load 
    val system = ActorSystem("SDDB", conf.getConfig("SDDB")) 

    // val master = system.actorOf(Props[TestActor], "master") 
    // master ! "a" 

    try { 
     val listener = new ServerSocket(port) 
     println("listening on port: " + port) 
     while (true) 
     //  new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start 
     new ConnectionThread(listener accept, system).start 
     listener close 
    } catch { 
     case e: IOException => 
     System.err.println("Could not listen on port: " + port + ".") 
     System.exit(-1) 
    } finally { 
     //  master ! PoisonPill 
     system.shutdown 
    } 
    } 

} 

case class ConnectionThread(socket: Socket, sys: ActorSystem) 
    extends Thread("ConnectionThread") { 

    private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r 
    private implicit var id = 0L 
    private implicit val timeout = Timeout(25.0 seconds) 
    private val conHelper = new ConnectionHelper 

    override def run { 
    try { 
     val out = new ObjectOutputStream(socket getOutputStream) 
     val in = new ObjectInputStream(socket getInputStream) 

     conHelper tell "funzt" 
     out writeObject ("Hi") 
     out.flush 
     val command = in.readObject.asInstanceOf[String] 
     println("received: " + command) 
     out writeObject ("test") 
     out.flush 
     out writeObject ("EOF") 
     out.flush 

     out.close 
     in.close 
     socket.close 
    } 
    } 

    private class ConnectionHelper { 
    val tester = sys.actorOf(Props[TestActor]) 

    def tell(s: String) { tester ! s } 

    } 

} 

我真的不明白爲什麼這個工程,並從我的問題的代碼不會。我歡迎所有的解釋。