2016-04-04 49 views
0

我正在學習virtualbox上的spark。我使用./bin/spark-shell打開spark並使用scala。現在我對使用scala的鍵值格式感到困惑。如何在spark中使用scala生成鍵值格式

我在家裏/鋒/火花/數據的txt文件,它看起來像:

panda 0 
pink 3 
pirate 3 
panda 1 
pink 4 

我用sc.textFile得到這個txt文件。如果我做

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7") 

那麼我可以用rdd.collect(),以顯示在屏幕上RDD:

scala> rdd.collect() 
res26: Array[String] = Array(panda 0, pink 3, pirate 3, panda 1, pink 4) 

但是,如果我這樣做

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7.txt") 

其中沒有」 .TXT 「 這裏。然後當我使用rdd.collect(),我得到一個錯誤:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/feng/spark/A.txt 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
...... 

但我看到了其他的例子。他們都在最後有「.txt」。我的代碼或系統有問題嗎?

另一件事是,當我試圖做的事:

scala> val rddd = rdd.map(x => (x.split(" ")(0),x)) 
rddd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29 
scala> rddd.collect() 
res0: Array[(String, String)] = Array((panda,panda 0), (pink,pink 3), (pirate,pirate 3), (panda,panda 1), (pink,pink 4)) 

我打算選擇數據的第一列,並以此爲重點。但rddd.collect()看起來不是那種方式,因爲這個詞出現了兩次,這是不對的。我無法繼續進行其他操作,如mapbykey,reducebykey或其他操作。我在哪裏做錯了?

任何幫助wille真的很感激。

+0

你的問題似乎與你使用「.txt」有點不一致。你可以檢查你的文字 - 並插入你的代碼 - 確保它是完全正確的。如果是這樣,那麼你的系統似乎真的搞砸了。 – Phasmid

回答

1

就比如我創建了一個String數據集,在這之後我用線分割的記錄,並使用SparkContextparallelize方法來創建一個RDD。請注意,在創建RDD後,我使用其map方法拆分存儲在每條記錄中的String並將其轉換爲。

import org.apache.spark.sql.Row 
val text = "panda 0\npink 3\npirate 3\npanda 1\npink 4" 

val rdd = sc.parallelize(text.split("\n")).map(x => Row(x.split(" "):_*)) 
rdd.take(3) 

take方法的輸出是:

res4: Array[org.apache.spark.sql.Row] = Array([panda,0], [pink,3], [pirate,3]) 

關於你的第一個問題,就沒有必要對文件有任何擴展。因爲在這種情況下,文件被視爲純文本。

相關問題