我試圖使用火花流從HDFS讀取數據。 以下是我的代碼。使用Sparkstreaming從HDFS獲取數據
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.fs._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(10))
val directory ="hdfs://pc-XXXX:9000/hdfs/watchdirectory/"
val lines=ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t:Path) => true, true).map(_._2.toString)
lines.count()
lines.print()
ssc.start
ssc.awaitTermination()
代碼運行但它不讀取HDFS中的任何數據。 每隔10秒鐘後,我會得到一個空行。
我已經通過fileStream的文檔,我知道我已經將文件移動到監視目錄。 但它不適合我。 我也嘗試過使用textFileStream,但沒有運氣。
我使用斯卡拉內置的spark 2.0.0 2.11.8
請任何建議。
你是如何嘗試textFileStream,發佈您的代碼,請 –
有沒有什麼大的差別,但這裏是我改變了一行代碼。 'val lines = ssc.textFileStream(「hdfs:// pc-XXXX:9000/hdfs/watchdirectory /」)' – neoguy
使用textFileStream並在運行spark程序後移動文件,如果文件已經存在,它將不會選擇。 –