0
我讀了阿卡流錯誤處理Akka Streams錯誤處理。如何知道哪一行失敗?
http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html
這篇文章寫了這樣的代碼。
val decider: Supervision.Decider = {
case _: Exception => Supervision.Restart
case _ => Supervision.Stop
}
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
val source = Source(1 to 10)
val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")}
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x))
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
future.onComplete{ _ =>
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
這工作得很好....除了我需要掃描輸出以查看哪一行沒有得到處理。有沒有辦法讓我打印/記錄失敗的行? [沒有在我寫的每一個流程中放置明確的try/catch塊?]
例如,如果我使用的是演員(而不是流),我可以寫一個演員的生命週期事件,我可以已經在重新啓動時重新啓動actor以及正在處理的消息時進行了記錄。
但在這裏我不明確使用演員(雖然他們在內部使用)。 Flow/Source/Sink是否存在生命週期事件?
你能叉流,並在那裏推壞到不同的「壞」的隊列,他們只是註銷(或任何你想要做的)? – Tyler
所以..爲了分岔流,你仍然必須捕獲所有的異常,並在catch塊路由到一個不同的流......不會很乏味嗎?是否有一個簡單的方法是用失敗的消息來分發流? –
哦,我不明白你的例子,我認爲你的條件聲明表示你知道事情要提前注意。 – Tyler