2017-09-26 31 views
0

我使用文件作爲Spark流,我想計算流中的單詞,但應用程序不打印任何內容,這是我的代碼。我使用Scala的上Cloudera的環境使用filstream的Spark流wordcount不打印結果

import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext 

object TwitterHashtagStreaming { 

def main(args: Array[String]) : Unit = { 

val conf = new SparkConf().setAppName("TwitterHashtagStreaming").setMaster("local[2]").set("spark.executor.memory","1g"); 

val streamingC = new StreamingContext(conf,Seconds(5)) 

val streamLines = streamingC.textFileStream("file:///home/cloudera/Desktop/wordstream") 
val words = streamLines.flatMap(_.split(" ")) 
val counts = words.map(word => (word, 1)).reduceByKey(_ + _) 

counts.print() 

streamingC.start() 
streamingC.awaitTermination() 
} 

} 
+0

什麼是打印?任何錯誤? –

+0

不,只是時間,好像計數是空的 -------------------------------------- ----- Time:1506415275000 ms --------------------------------------- ---- –

+0

首先嚐試在進行字數統計之前打印streamLines,以確保數據是否已被讀取。 –

回答

1

請仔細參考了document

def textFileStream(directory: String): DStream[String] 

創建監視 新文件的Hadoop兼容的文件系統和讀取它們作爲文本輸入流文件(使用Key作爲LongWritable, 值爲Text和TextInputFormat的輸入格式)。通過從同一文件系統中的另一個 位置「移動」它們,文件必須被 寫入受監視目錄。文件名稱以。 被忽略。

總之,它是一個變化檢測器,你必須開始你的流媒體服務,然後將你的數據寫入你的monitor目錄。

這種語義將模擬「流概念」時,它實際上是部署在生產環境中,例如網絡數據包會逐漸像收入您的文件。