2017-06-15 97 views
1

我正在學習如何使用Spark/Scala讀取和寫入HDFS中的文件。 我無法寫入HDFS文件,該文件已創建,但它是空的。 我不知道如何創建一個循環來寫入文件。使用Spark/Scala在HDFS文件中使用迭代寫入

的代碼是:

import scala.collection.immutable.Map 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

// Read the adult CSV file 
    val logFile = "hdfs://zobbi01:9000/input/adult.csv" 
    val conf = new SparkConf().setAppName("Simple Application") 
    val sc = new SparkContext(conf) 
    val logData = sc.textFile(logFile, 2).cache() 


    //val logFile = sc.textFile("hdfs://zobbi01:9000/input/adult.csv") 
    val headerAndRows = logData.map(line => line.split(",").map(_.trim)) 
    val header = headerAndRows.first 
    val data = headerAndRows.filter(_(0) != header(0)) 
    val maps = data.map(splits => header.zip(splits).toMap) 
    val result = maps.filter(map => map("AGE") != "23") 

    result.foreach{ 

     result.saveAsTextFile("hdfs://zobbi01:9000/input/test2.txt") 
    } 

如果我更換: result.foreach{println}

然後它的作品!

但使用(saveAsTextFile)的方法時,則一個錯誤消息被拋出

<console>:76: error: type mismatch; 
found : Unit 
required: scala.collection.immutable.Map[String,String] => Unit 
      result.saveAsTextFile("hdfs://zobbi01:9000/input/test2.txt") 

任何幫助請。

回答

1
result.saveAsTextFile("hdfs://zobbi01:9000/input/test2.txt") 

這就是你需要做的。你不需要遍歷所有的行。

希望這會有所幫助!

+0

如果這有幫助,你能接受這個答案嗎? –

1

這是幹什麼!

result.foreach{ 
    result.saveAsTextFile("hdfs://zobbi01:9000/input/test2.txt") 
} 

RDD action不能從RDD transformations,除非特殊的conf集觸發。您可以使用result.saveAsTextFile("hdfs://zobbi01:9000/input/test2.txt")保存到HDFS。

如果您需要寫入文件中的其他格式,請在寫入之前在rdd本身中進行更改。

+0

與其他答案有什麼不同? –

+0

恭喜! Yu先回答!!! :) 當我打開標籤時,我看到你回答。無論如何,這無助於問題。 –

+0

非常感謝。有用。當我真正保存到(test2.txt)時,這是我的錯誤,認爲它是一個文件。事實上,它是一個文件夾,所以我改爲(test2) –