2015-06-05 86 views
2

我有一個RDD,其中有很多列(例如數百個),我的大部分操作都在列上。我需要從不同列創建許多中間變量。Spark Spark RDD上的列操作

什麼是最有效的方式做到這一點?

例如,如果我的dataRDD[Array[String]]是象下面這樣:

123, 523, 534, ..., 893 
536, 98, 1623, ..., 98472 
537, 89, 83640, ..., 9265 
7297, 98364, 9, ..., 735 
...... 
29, 94, 956, ..., 758 

我將需要創建一個新的列或newCol1 = 2ndCol+19thCol變量,並根據newCol1另一個新柱和現有列:newCol2 = function(newCol1, 34thCol)

這樣做的最好方法是什麼?

我一直在使用索引的中間變量和dataRDD思考,然後加入他們一起對指數做我的計算:

var dataRDD = sc.textFile("/test.csv").map(_.split(",")) 
val dt = dataRDD.zipWithIndex.map(_.swap) 
val newCol1 = dataRDD.map(x => x(1)+x(18)).zipWithIndex.map(_.swap) 
val newCol2 = newCol1.join(dt).map(x=> function(.........)) 

是否有這樣做的更好的辦法?

回答

1

爲什麼不直接做這一切在一個:

var dataRDD = sc.textFile("/test.csv").map(_.split(",")) 
dataRDD.map(x=>{ 
    val newCol = x(1) + x(18) 
    val newCol2 = function(newCol, x(33)) 
    //anything else you need to do 
    newCol +: newCol2 +: x //This will return the original array with the new columns prepended 
    //x +: newCol +: newCol2 //Alternatively, this will return the original array with the new columns appended 
}) 
+0

感謝賈斯汀您的回覆。我可能誤解了你的觀點。但是,當我嘗試類似「dataRDD.map(x => {val a = x(1)})。collect」時,我得到了如Array [Unit] = Array((),(),(), ),(),(),(),(),(),())。我在這裏錯過了什麼嗎? – Carter

+0

對不起,我猜你對Scala並不太熟悉。函數中的最後一個語句是返回值。在列出的情況下,變量賦值的結果是一個Unit或()。我修改了我的代碼,使這個更明顯的如何完全做出改變 –

+0

謝謝賈斯汀!是的,我是Scala的新手,所以不熟悉一些概念。抱歉。 – Carter