2017-09-02 93 views
0

我想弄清楚如何處理在你的一個階段中你需要進行一個返回InputStream的調用,在那裏我將處理該流作爲舞臺的來源進一步下降。流內的Akka-Stream流

例如

Source.map(e => Calls that return an InputStream) 
.via(processingFlow).runwith(sink.ignore) 

我想該元素將處理流程那些從InputStream到來。這基本上是我拖尾一個文件,讀取每一行,這一行給我關於一個調用,我需要對一個CLI api進行的信息的情況下,當進行該調用時,我將Stdout作爲一個InputStream從中讀取結果。結果在大部分時間都會很大,所以我可以只收集記憶中的所有東西。

中號

回答

1
  • 可以使用StreamConverters事業得到Source S和從java.io流Sink秒。更多信息here
  • 您可以使用flatMapConcatflatMapMergeSource s的流平鋪爲單個流。更多信息here

一個簡單的例子可以是:

val source: Source[String, NotUsed] = ??? 
    def gimmeInputStream(name: String): InputStream = ??? 
    val processingFlow: Flow[ByteString, ByteString, NotUsed] = ??? 

    source 
    .map(gimmeInputStream) 
    .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192)) 
    .via(processingFlow) 
    .runWith(Sink.ignore) 

但是阿卡流提供了一個更地道DSL在FileIO對象爲讀/寫文件。更多信息here

示例變爲:

val source: Source[String, NotUsed] = ??? 
    val processingFlow: Flow[ByteString, ByteString, NotUsed] = ??? 

    source 
    .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name))) 
    .via(processingFlow) 
    .runWith(Sink.ignore)