2017-02-06 34 views
4

我已經構建了一個定義流程的阿卡圖。我的目標是重新格式化未來的響應並將其保存到文件中。該流程可波紋管進行概述:你如何處理Akka Flow的未來?

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
     import GraphDSL.Implicits._ 
     val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false)) 
     val merger = builder.add(Merge[Future[Map[String, String]]](6)) 
     val fileSink = FileIO.toPath(outputPath, options) 
     val ignoreSink = Sink.ignore 
     val in = Source(seeds) 
     in ~> balancer.in 
     for (i <- Range(0,6)) { 
     balancer.out(i) ~> 
      wikiFlow.async ~> 
      // This maps to a Future[Map[String, String]] 
      Flow[(Try[HttpResponse], String)].map(parseHtml) ~> 
      merger 
     } 

     merger.out ~> 
     // When we merge we need to map our Map to a file 
     Flow[Future[Map[String, String]]].map((d) => { 
     // What is the proper way of serializing future map 
     // so I can work with it like a normal stream into fileSink? 

     // I could manually do -> 
     // d.foreach(someWriteToFileProcess(_)) 
     // with ignoreSink, but this defeats the nice 
     // akka flow 
     }) ~> 
     fileSink 

     ClosedShape 
    }) 

我可以破解這個工作流來寫我的未來地圖通過的foreach一個文件,但恐怕這可能在某種程度上導致與FileIO專注併發性問題,它只是不感覺不錯。用我們的akka​​流程處理未來的正確方法是什麼?

+0

的[一個阿卡流內創建從未來背壓](可能的複製http://stackoverflow.com/questions/39909303/create-backpressure-from-a -future-inside-a-akka-stream) –

回答

9

創建涉及異步計算的Flow的最簡單方法是使用mapAsync

所以......可以說你想創建一個Flow消耗Int和使用異步計算mapper: Int => Future[String]與5

val mapper: Int => Future[String] = (i: Int) => Future(i.toString) 

val yourFlow = Flow[Int].mapAsync[String](5)(mapper) 

的平行度,現可生產String,你可以在你的圖形使用這個流程隨你怎麼便。

一個例子的使用將是,

val graph = GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val intSource = Source(1 to 10) 

    val printSink = Sink.foreach[String](s => println(s)) 

    val yourMapper: Int => Future[String] = (i: Int) => Future(i.toString) 

    val yourFlow = Flow[Int].mapAsync[String](2)(yourMapper) 

    intSource ~> yourFlow ~> printSink 

    ClosedShape 
} 
+0

今晚稍後我會檢查一下。不知道mapAsync,我沒有在他們的指南中看到它。謝謝! –