3
我嘗試阿卡流和這裏的大型文件是一小段,我有:閱讀使用阿卡流
override def main(args: Array[String]) {
val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0)
val file = new File(filePath)
println(file.getAbsolutePath)
// read 1MB of file as a stream
val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024)
val shaFlow = fileSource.map(chunk => {
println(s"the string obtained is ${chunk.toString}")
})
shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer
def sha256(s: String) = {
val messageDigest = MessageDigest.getInstance("SHA-256")
messageDigest.digest(s.getBytes("UTF-8"))
}
}
當我跑了這個片段,我得到:
Exception in thread "main" java.lang.NullPointerException
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365)
at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30)
at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala)
在我看來,它不是fileSource只是空的?爲什麼是這樣?有任何想法嗎? FoodFacts.csv如果大小爲40MB,我所要做的就是創建一個1MB的數據流!
即使使用defaultChunkSize 8192也不行!
您使用的是什麼版本的阿卡流?我認爲SynchronousFileSource現在已被棄用 – Jatin
我使用1.0。我應該使用哪一個閱讀一個大文件並將這些塊作爲流傳遞?任何線索? – sparkr