5
A
回答
14
我不認爲有一種方法可以爲任意的圖做到這一點,但是如果你有你的圖在控制之下,你只需要將監視接收器附加到每個可能失敗或完成的節點的輸出(這些都是至少有一個輸出節點),例如:
import akka.actor.Status
// obtain graph parts (this can be done inside the graph building as well)
val source: Source[Int, NotUsed] = ...
val flow: Flow[Int, String, NotUsed] = ...
val sink: Sink[String, NotUsed] = ...
// create monitoring actors
val aggregate = actorSystem.actorOf(Props[Aggregate])
val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate)))
val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate)))
// create the graph
val graph = GraphDSL.create() { implicit b =>
import GraphDSL._
val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))),
val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(())))
val bc1 = b.add(Broadcast[Int](2))
val bc2 = b.add(Broadcast[String](2))
// main flow
source ~> bc1 ~> flow ~> bc2 ~> sink
// monitoring branches
bc1 ~> sourceMonitor
bc2 ~> flowMonitor
ClosedShape
}
// run the graph
RunnableGraph.fromGraph(graph).run()
class Monitor(name: String, aggregate: ActorRef) extends Actor {
override def receive: Receive = {
case Status.Success(_) => aggregate ! s"$name completed successfully"
case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}"
case _ =>
}
}
class Aggregate extends Actor {
override def receive: Receive = {
case s: String => println(s)
}
}
也可以建立只有一個監測演員和各監測接收器使用它,但在這種情況下,你將無法在失敗的流之間輕鬆區分。
另外還有watchTermination()
關於來源和流量的方法,它允許實現與流量一起終止的未來。我想這可能是難以GraphDSL
使用,但與常規流的方法可能是這樣的:
import akka.Done
import akka.actor.Status
import akka.pattern.pipe
val monitor = actorSystem.actorOf(Props[Monitor])
source
.watchTermination()((f, _) => f pipeTo monitor)
.via(flow).watchTermination((f, _) => f pipeTo monitor)
.to(sink)
.run()
class Monitor extends Actor {
override def receive: Receive = {
case Done => println("stream completed")
case Status.Failure(e) => println(s"stream failed: ${e.getMessage}")
}
}
你可以它的值通過管道傳遞給演員流之間進行區分之前改變未來。
相關問題
- 1. 如何關閉阿卡流?
- 2. 如何測試的阿卡流閉合形狀可運行的曲線圖封裝有源和宿
- 3. 阿卡卡夫卡流監理策略不工作
- 4. 阿卡流 - Source.fromPublisher
- 5. Pickerview內封閉視圖
- 6. 阿卡流+阿卡-HTTP生命週期
- 7. java.io.ioexception試圖在封閉流中讀取
- 8. 查找圖像中的封閉形狀
- 9. 監視流類
- 10. 從封閉流中讀取
- 11. 輸出到封閉的流?
- 12. 阿卡關閉TCP演員
- 13. 測試阿卡反應流
- 14. 阿卡輸入流處理
- 15. 閱讀使用阿卡流
- 16. 阿卡流OnNext不允許
- 17. 多邊形不封閉
- 18. 呈現封閉的Marionette視圖
- 19. 阿帕奇卡夫卡和Spark流
- 20. 開放形式和封閉形式
- 21. 阿卡定製監事未生效
- 22. 阿卡演員定製監事
- 23. 阿卡:測試監控\死看
- 24. 監視USB流量
- 25. 斯卡拉阿卡流合併過濾器和地圖
- 26. 阿卡流+阿卡的Http傳遞參數
- 27. 阿卡流+阿卡的Http - 獲取上的錯誤
- 28. Python龜圖形填充非封閉多邊形
- 29. 阿卡流TCP +阿卡流卡夫卡生產者未停止不發佈消息,而不是錯誤-ING出
- 30. MBean簡單圖形監視器
我喜歡這種方式,謝謝 –
優秀的答案,謝謝 – botkop
@ println中缺少@ vladimir-matveev's'。 '「$ name' - >'s」$ name' – lamusique