我已經構建了一個定義流程的阿卡圖。我的目標是重新格式化未來的響應並將其保存到文件中。該流程可波紋管進行概述:你如何處理Akka Flow的未來?
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
val merger = builder.add(Merge[Future[Map[String, String]]](6))
val fileSink = FileIO.toPath(outputPath, options)
val ignoreSink = Sink.ignore
val in = Source(seeds)
in ~> balancer.in
for (i <- Range(0,6)) {
balancer.out(i) ~>
wikiFlow.async ~>
// This maps to a Future[Map[String, String]]
Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
merger
}
merger.out ~>
// When we merge we need to map our Map to a file
Flow[Future[Map[String, String]]].map((d) => {
// What is the proper way of serializing future map
// so I can work with it like a normal stream into fileSink?
// I could manually do ->
// d.foreach(someWriteToFileProcess(_))
// with ignoreSink, but this defeats the nice
// akka flow
}) ~>
fileSink
ClosedShape
})
我可以破解這個工作流來寫我的未來地圖通過的foreach一個文件,但恐怕這可能在某種程度上導致與FileIO專注併發性問題,它只是不感覺不錯。用我們的akka流程處理未來的正確方法是什麼?
的[一個阿卡流內創建從未來背壓](可能的複製http://stackoverflow.com/questions/39909303/create-backpressure-from-a -future-inside-a-akka-stream) –