2016-10-03 56 views
0

我試圖使用火花流從HDFS讀取數據。 以下是我的代碼。使用Sparkstreaming從HDFS獲取數據

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.hadoop.fs._ 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

val sparkConf = new SparkConf() 
val ssc = new StreamingContext(sparkConf, Seconds(10)) 

val directory ="hdfs://pc-XXXX:9000/hdfs/watchdirectory/" 
val lines=ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t:Path) => true, true).map(_._2.toString) 
lines.count() 
lines.print() 
ssc.start 
ssc.awaitTermination() 

代碼運行但它不讀取HDFS中的任何數據。 每隔10秒鐘後,我會得到一個空行。

我已經通過fileStream的文檔,我知道我已經將文件移動到監視目錄。 但它不適合我。 我也嘗試過使用textFileStream,但沒有運氣。

我使用斯卡拉內置的spark 2.0.0 2.11.8

請任何建議。

+0

你是如何嘗試textFileStream,發佈您的代碼,請 –

+0

有沒有什麼大的差別,但這裏是我改變了一行代碼。 'val lines = ssc.textFileStream(「hdfs:// pc-XXXX:9000/hdfs/watchdirectory /」)' – neoguy

+0

使用textFileStream並在運行spark程序後移動文件,如果文件已經存在,它將不會選擇。 –

回答

0

請嘗試以下

val sparkConf = new SparkConf() 
val ssc = new StreamingContext(sparkConf, Seconds(10)) 

val lines= ssc.textFileStream("hdfs://pc-XXXX:9000/hdfs/watchdirectory/").map(_._2.toString) 
lines.count() 
lines.print() 
ssc.start 
ssc.awaitTermination() 

一旦執行此,將文件移動到

/hdfs/watchdirectory/ 
+0

在發佈問題之前,我已經發現了這個問題,但它對我不起作用 – neoguy

+0

你好嗎?哪種模式?本地,YARN,梅森,獨自站立? –

+0

嘗試使用本地和獨立 – neoguy