2016-02-05 144 views
3

我嘗試阿卡流和這裏的大型文件是一小段,我有:閱讀使用阿卡流

override def main(args: Array[String]) { 
    val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0) 

    val file = new File(filePath) 
    println(file.getAbsolutePath) 
    // read 1MB of file as a stream 
    val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024) 
    val shaFlow = fileSource.map(chunk => { 
     println(s"the string obtained is ${chunk.toString}") 
    }) 
    shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer 

    def sha256(s: String) = { 
     val messageDigest = MessageDigest.getInstance("SHA-256") 
     messageDigest.digest(s.getBytes("UTF-8")) 
    } 
    } 

當我跑了這個片段,我得到:

Exception in thread "main" java.lang.NullPointerException 
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365) 
    at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30) 
    at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala) 

在我看來,它不是fileSource只是空的?爲什麼是這樣?有任何想法嗎? FoodFacts.csv如果大小爲40MB,我所要做的就是創建一個1MB的數據流!

即使使用defaultChunkSize 8192也不行!

+0

您使用的是什麼版本的阿卡流?我認爲SynchronousFileSource現在已被棄用 – Jatin

+0

我使用1.0。我應該使用哪一個閱讀一個大文件並將這些塊作爲流傳遞?任何線索? – sparkr

回答

4

以及1.0已棄用。如果可以,請使用2.x

當我使用2.0.1版本使用FileIO.fromFile(file)而不是SynchronousFileSource進行嘗試時,它是編譯失敗,消息爲fails with null pointer。這僅僅是因爲它的範圍沒有ActorMaterializer。包括它,使其工作:

object TImpl extends App { 
import java.io.File 

    implicit val system = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    val file = new File("somefile.csv") 
    val fileSource = FileIO.fromFile(file,1 * 1024 * 1024) 
    val shaFlow: Source[String, Future[Long]] = fileSource.map(chunk => { 
    s"the string obtained is ${chunk.toString()}" 
    }) 

    shaFlow.runForeach(println(_))  
} 

這適用於任何大小的文件。有關調度程序配置的更多信息,請參閱here