2016-06-13 56 views
5

我有一個文件列表。我想要:Akka流:讀取多個文件

  1. 將它們全部作爲單個來源讀取。
  2. 應按順序讀取文件。 (不循環)
  3. 任何文件都不應該被要求完全在內存中。
  4. 從文件中讀取錯誤應該摺疊流。

這感覺就像這應該工作:(斯卡拉,阿卡流v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

但是,由於在一個FileIO編譯錯誤結果已與其相關聯的物化價值,並沒有按Source.combine不支持。

映射物化價值遠讓我不知道文件的讀取錯誤是如何被處理,但並編譯:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
    .mapMaterializedValue(f => NotUsed.getInstance()) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

但在運行時會拋出IllegalArgumentException:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out] 

回答

8

爲了清楚地模塊化不同的問題,下面的代碼並不儘可能簡潔。

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings 
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) 

// given as stream of Paths we read those files and count the number of lines 
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) 

// Here's our test data source (replace paths with real paths) 
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) 

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes 
testFiles runWith lineCounter foreach println 
+0

我正在尋找模塊,所以我明白這一點。我使用行數作爲我可以對文件進行處理的一個例子,並且將'lineCounter'寫爲文件讀取。 (它是一個水槽)但是如果我將摺疊和其他所有東西都移動到其他地方,我會留下一個Flow [Path,String,NotUsed],這正是我所尋找的。 – randomstatistic

+0

能否請您提供您的示例的導入,他們是代碼的重要組成部分。 –

+1

@OsskarWerrewka它應該都在akka.stream.scaladsl和java IO/NIO中。你有問題嗎? –

-1

我有一個答案走出大門 - 不要使用akka.FileIO。這似乎工作正常,例如:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _) 
val source = Source.fromIterator[String](() => sources) 
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 

我還想知道是否有更好的解決方案。

+0

通過使用'io.Source'你失去了很多的權力。對於小文件,這可能會起作用,但它不適用於大文件。 – jarandaf

+0

@jarandaf你能澄清嗎?我的印象是,io.Source只是使用了BufferedReader,而getLines迭代器不會立即加載整個文件或類似的東西。 – randomstatistic

+0

更好的想法,你可能是對的(雖然'FileIO'處理'ByteString'而不是'String',這意味着更高性能)。另一方面,使用'io.Source'時,總是要記住關閉源代碼(默認情況下不會這樣做)。 – jarandaf

2

更新哦,我沒看到接受的答案,因爲我沒有刷新頁面> _ <。因爲我還添加了一些關於錯誤處理的註釋,所以我會在這裏留下它。

我相信下面的程序你想要做什麼:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, IOResult} 
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} 
import akka.util.ByteString 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import scala.util.control.NonFatal 
import java.nio.file.Paths 
import scala.concurrent.duration._ 

object TestMain extends App { 
    implicit val actorSystem = ActorSystem("test") 
    implicit val materializer = ActorMaterializer() 
    implicit def ec = actorSystem.dispatcher 

    val sources = Vector("build.sbt", ".gitignore") 
    .map(Paths.get(_)) 
    .map(p => 
     FileIO.fromPath(p) 
     .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) 
     .mapMaterializedValue { f => 
      f.onComplete { 
      case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") 
      case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") 
      case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") 
      } 
      NotUsed 
     } 
    ) 
    val finalSource = Source(sources).flatMapConcat(identity) 

    val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 
    result.onComplete { 
    case Success(n) => println(s"Read $n lines total") 
    case Failure(e) => println(s"Reading failed: $e") 
    } 
    Await.ready(result, 10.seconds) 

    actorSystem.terminate() 
} 

這裏的關鍵是flatMapConcat()方法:它改變流的每個元素爲源,並返回這些資源如果得到元素的流它們按順序運行。

至於處理錯誤,您可以在mapMaterializedValue參數中爲未來添加處理程序,也可以通過將處理程序置於物化未來值上來處理運行流的最終錯誤。我在上面的例子中都做過了,如果你測試它,比如在一個不存在的文件上,你會看到相同的錯誤信息會被打印兩次。不幸的是,flatMapConcat()沒有收集物化值,坦率地說,我看不出它能夠做到這一點的方式,因此如有必要,您必須單獨處理它們。