2017-04-14 80 views
1

我嘗試將wordcount結果保存在文件中。值saveAsTextFile不是org.apache.spark.streaming.dstream.DStream [(String,Long)]的成員]

val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) 
wordCounts.saveAsTextFile("/home/hadoop/datafile1") 

卻是露出

value saveAsTextFile is not a member of org.apache.spark.streaming.dstream.DStream[(String, Long)]    [error]  wordCounts.saveAsTextFile("/home/hadoop/datafile1") 

我使用的火花2.1。我展示了一個建議老火花版本的答案,但我想在火花2.1中做。謝謝。

回答

0

您在DStream上使用的定義方法爲RDD

這是RDD方法:

def saveAsTextFile(path: String): Unit 

...與描述 「保存此RDD爲一個文本文件,使用元素的字符串表示。」

這是DStream方法:在此DSTREAM作爲文本文件

saveAsTextFiles(prefix: String, suffix: String = ""): Unit 

...與描述「保存每個RDD,使用元素的字符串表示在每批間隔的文件名是。根據前綴和後綴生成:「prefix-TIME_IN_MS.suffix。

因此該方法的簽名是不同的。 - 無論是在名稱和參數

在你的代碼,wordCounts顯然是DStream,因此它不具有saveAsTextFile方法

然而,我得到你對抽象概念感到困惑的感覺,並且確實想要寫出包含在DStream microbatch中的個人RDD。要做到這一點:

counts.foreachRDD { rdd => 
    ...   
    rdd.saveAsTextFiles(s"/home/hadoop/datafile-$timestamp") 

} 
0

API documentation提到API爲 「saveAsTextFiles」

saveAsTextFiles(String prefix, String suffix) 

保存每個RDD在此DSTREAM如在文本文件中,使用字符串元素的 表示。

相關問題