2017-09-21 79 views
0

我讀了阿卡流錯誤處理Akka Streams錯誤處理。如何知道哪一行失敗?

http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html

這篇文章寫了這樣的代碼。

val decider: Supervision.Decider = { 
    case _: Exception => Supervision.Restart 
    case _ => Supervision.Stop 
} 

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) 

val source = Source(1 to 10) 
val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")} 
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x)) 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s => 
    import GraphDSL.Implicits._ 
    source ~> flow ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
future.onComplete{ _ => 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf) 

這工作得很好....除了我需要掃描輸出以查看哪一行沒有得到處理。有沒有辦法讓我打印/記錄失敗的行? [沒有在我寫的每一個流程中放置明確的try/catch塊?]

例如,如果我使用的是演員(而不是流),我可以寫一個演員的生命週期事件,我可以已經在重新啓動時重新啓動actor以及正在處理的消息時進行了記錄。

但在這裏我不明確使用演員(雖然他們在內部使用)。 Flow/Source/Sink是否存在生命週期事件?

+0

你能叉流,並在那裏推壞到不同的「壞」的隊列,他們只是註銷(或任何你想要做的)? – Tyler

+0

所以..爲了分岔流,你仍然必須捕獲所有的異常,並在catch塊路由到一個不同的流......不會很乏味嗎?是否有一個簡單的方法是用失敗的消息來分發流? –

+0

哦,我不明白你的例子,我認爲你的條件聲明表示你知道事情要提前注意。 – Tyler

回答

1

只是一個小修改代碼:

val decider: Supervision.Decider = { 
    case e: Exception => 
    println("Exception handled, recovering stream:" + e.getMessage) 
    Supervision.Restart 
    case _ => Supervision.Stop 
} 

如果傳遞有意義的消息流中的例外,例如線,您可以在監督決勝局打印出來。

我用println給一個快速和簡單的答案,但強烈建議您使用 一些日誌庫,如scala-logging

相關問題