我對spark和scala很新穎,因此我有一些關於使用spark和使用rdds進行數據預處理的問題。 我正在開發一個小項目,我想用spark實現一個機器學習系統。使用這些算法是可以的,但我認爲在預處理數據時遇到了問題。 我有一個包含30列和大約一百萬行的數據集。但是,爲了簡單起見,讓我們假設我有以下的數據集(CSV文件):使用apache spark和scala進行數據預處理
columnA, columnB, column_txt, label
1 , a , abc , 0
2 , , abc , 0
3 , b , abc , 1
4 , b , abc , 1
5 , a , abc , 0
6 , , abc , 0
7 , c , abc , 1
8 , a , abc , 1
9 , b , abc , 1
10 , c , abc , 0
火花加載數據後,我想要做的步驟如下:
- 刪除所有以結束所有列「 _txt」
- 篩選出所有行columnB是空的(這個我想通了的話)
- 刪除有超過9級(從這裏columnA)
所以我有問題1和3. 我知道我不能刪除列,所以我不得不創建一個新的RDD,但我怎麼做沒有某些列? 現在我正在加載csv文件,但沒有火花頭,但爲我的任務,我需要。建議在單獨的rdd中加載標題嗎?但是,我該如何與該rdd進行交互以找到正確的列呢? 對不起,我知道很多問題,但我仍然在一開始就想學習。如果沒有頭裝
import org.apache.spark.sql.DataFrame
def moreThan9(df: DataFrame, col: String) = {
df.agg(countDistinct(col)).first()(0) match {
case x: Long => x > 9L
case _ => false
}
}
val newDf = df.
schema. // Extract schema
toArray. // Convert to array
map(_.name). // Map to names
foldLeft(df)((df: DataFrame, col: String) => {
if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
})
: 感謝和問候, 克里斯
如果我是這樣做的「手工」,我會創建一個RDD,調用'.take(1)'在主服務器上獲取頭文件,對頭文件進行任何處理/並行處理,然後放下第一行以獲取RDD中的數據。 – lmm