假設我有一個工作者actor接收消息,進行一些處理並返回結果。並予有需要被轉換成結果的序列消息的序列:如果我運行上面只用「消息1」如何使用提問模式編譯結果序列
object Test {
case class Message(str: String)
case class Result(str: String)
class Worker extends Actor {
def receive = {
case Message(data) =>
println("Sleeping: " + data)
Thread.sleep(10000)
val result = Result(data + " - result")
println("Sending result: " + result)
sender ! result
}
}
def test(messages: Seq[Message]): Future[Seq[Result]] = {
val worker = ActorSystem().actorOf(Props(new Worker))
val results = messages.map { m =>
implicit val timeout = Timeout(20 seconds)
println("Sending: " + m)
val result = worker ? m
result.asInstanceOf[Future[Result]]
}
Future.sequence(results)
}
def main(args: Array[String]): Unit = {
val messages: Seq[Message] = args.map(Message(_))
test(messages).foreach { r =>
println("Result: " + r)
}
}
}
作爲參數運行良好給予的輸出低於:
發送:消息(消息1)
睡眠:消息1
硒nding結果:結果(消息1 - 結果)
結果:ArraySeq(結果(消息1 - 結果))
但是說我做它: 「消息1」,「消息2 「‘消息3’,那麼最後的消息最終被髮送到deadLetters:
發送:消息(消息1)發送:消息(消息2)睡眠: 消息1發送:消息(消息-3)
發送結果:結果(消息1 - 結果)
睡眠:消息2
發送結果:結果(消息-2 - 結果)
睡眠:消息3
發送結果:結果(消息3 - 結果)
[INFO] [07/15/2016 09:07:49.832] [default-akka.actor.default-dispatcher-2] [akka:// default/deadLetters] Message [util.Tester $ Result] from 演員[akka:// default/user/$ a#1776546850]到 演員[akka:// default/deadLetters]未送達。 [1]遇到死亡字母 。此日誌記錄可以關閉或通過 配置設置'akka.log-dead-letters'和 'akka.log-dead-letters-during-shutdown'進行調整。
我猜這是因爲我的調用線程在發送最後一條消息時超出了範圍。如何正確地將所有結果收集到一個序列中?
注意改變自己的測試方法,下面給出了相同的結果:
def test(messages: Seq[Message]): Future[Seq[Result]] = {
val worker = ActorSystem().actorOf(Props(new Worker))
Future.traverse(messages) { m =>
implicit val timeout = Timeout(20 seconds)
println("Sending: " + m)
val result = worker ? m
result.asInstanceOf[Future[Result]]
}
}
變量「actor」不存在,你的意思是'worker'在這裏嗎?不知道你爲什麼看到死信,除非你的工作人員沒有正確響應,就像可能使用'sender()'不正確。此外,您可能想要查看「Future.sequence」以將所有結果作爲單個「Future」來獲得。但我無法確定這些問題是否能解決您的問題。 – acjay