我有一個文件列表。我想要:Akka流:讀取多個文件
- 將它們全部作爲單個來源讀取。
- 應按順序讀取文件。 (不循環)
- 任何文件都不應該被要求完全在內存中。
- 從文件中讀取錯誤應該摺疊流。
這感覺就像這應該工作:(斯卡拉,阿卡流v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但是,由於在一個FileIO
編譯錯誤結果已與其相關聯的物化價值,並沒有按Source.combine
不支持。
映射物化價值遠讓我不知道文件的讀取錯誤是如何被處理,但並編譯:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在運行時會拋出IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
我正在尋找模塊,所以我明白這一點。我使用行數作爲我可以對文件進行處理的一個例子,並且將'lineCounter'寫爲文件讀取。 (它是一個水槽)但是如果我將摺疊和其他所有東西都移動到其他地方,我會留下一個Flow [Path,String,NotUsed],這正是我所尋找的。 – randomstatistic
能否請您提供您的示例的導入,他們是代碼的重要組成部分。 –
@OsskarWerrewka它應該都在akka.stream.scaladsl和java IO/NIO中。你有問題嗎? –