2015-05-06 45 views
2

我試圖剝離通過saveAsTextFile保存的CSV數據中的包裝類或數組文本,而無需執行非Spark後處理步驟。格式(刪除class/parens)Spark CSV saveAsTextFile輸出?

我在一些大文件中有一些TSV數據,我將這些數據提供給RDD。

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => Test(x(0),x(1))) 

testRdd.saveAsTextFile("test") 

這樣可以節省類名包數據:

head -n 1 part-00000 
Test("1969720fb3100608b38297aad8b3be93","active") 

我也試過它消耗到一個無名類,而不是一個案例類(?)。

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => (x(0),x(1))) 

testRdd.saveAsTextFile( 「test2的」)

這產生

("1969720fb3100608b38297aad8b3be93","active") 

這仍然需要以除去纏繞的括號後處理。

爲了剝離包裹字符,我試過flatMap(),但RDD顯然是不正確的類型:

testRdd.flatMap(identity).saveAsTextFile("test3") 
<console>:17: error: type mismatch; 
found : ((String, String)) => (String, String) 
required: ((String, String)) => TraversableOnce[?] 
       testRdd.flatMap(identity).saveAsTextFile("test3") 

所以...我需要的RDD轉換成一些其他類型的RDD,還是有另一種方式將RDD保存爲CSV,這樣封裝文本就被剝離了?

謝謝!

回答

1

你可以嘗試以下方法:

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")) 
           .map(x => x.toLowerCase.split('\t')) 
           .map(x => x(0)+","+x(1)) 

我們所做的聽到的是過濾你的頭後,就可以小寫的字符串中的相同的地圖通道也保存了一些不必要的額外映射

這將創建一個可保存爲CSV格式的RDD [字符串]。

PS:

  • 保存RDD輸出的延伸是不是一個CSV但格式!

  • 這不是最佳和唯一的解決方案,但它會爲你做的工作!

+1

完善。感謝您不僅回答問題,而且指出在單個map()調用中執行多個步驟的簡化。 – user2029783

+0

不客氣! – eliasah

2
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => x(0)+","+x(1)) 

這將寫入輸出爲CSV

0

VAL LOGFILE = 「/input.csv」

VAL CONF =新SparkConf()。組( 「spark.driver.allowMultipleContexts」, 「真」)

VAL SC =新SparkContext(主=「local」,appName =「Mi app」,conf)

val logData = sc。文本文件(日誌文件,2).cache()

VAL低級= logData.map(線=> line.toLowerCase)