2015-07-21 114 views
2

我對spark和scala很新穎,因此我有一些關於使用spark和使用rdds進行數據預處理的問題。 我正在開發一個小項目,我想用spark實現一個機器學習系統。使用這些算法是可以的,但我認爲在預處理數據時遇到了問題。 我有一個包含30列和大約一百萬行的數據集。但是,爲了簡單起見,讓我們假設我有以下的數據集(CSV文件):使用apache spark和scala進行數據預處理

columnA, columnB, column_txt, label 
1  , a  , abc  , 0 
2  ,  , abc  , 0 
3  , b  , abc  , 1 
4  , b  , abc  , 1 
5  , a  , abc  , 0 
6  ,  , abc  , 0 
7  , c  , abc  , 1 
8  , a  , abc  , 1 
9  , b  , abc  , 1 
10  , c  , abc  , 0 

火花加載數據後,我想要做的步驟如下:

  1. 刪除所有以結束所有列「 _txt」
  2. 篩選出所有行columnB是空的(這個我想通了的話)
  3. 刪除有超過9級(從這裏columnA)
那些列10

所以我有問題1和3. 我知道我不能刪除列,所以我不得不創建一個新的RDD,但我怎麼做沒有某些列? 現在我正在加載csv文件,但沒有火花頭,但爲我的任務,我需要。建議在單獨的rdd中加載標題嗎?但是,我該如何與該rdd進行交互以找到正確的列呢? 對不起,我知道很多問題,但我仍然在一開始就想學習。如果沒有頭裝

import org.apache.spark.sql.DataFrame 

def moreThan9(df: DataFrame, col: String) = { 
    df.agg(countDistinct(col)).first()(0) match { 
     case x: Long => x > 9L 
     case _ => false 
    } 
} 

val newDf = df. 
    schema. // Extract schema 
    toArray. // Convert to array 
    map(_.name). // Map to names 
    foldLeft(df)((df: DataFrame, col: String) => { 
     if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df 
    }) 

: 感謝和問候, 克里斯

+0

如果我是這樣做的「手工」,我會創建一個RDD,調用'.take(1)'在主服務器上獲取頭文件,對頭文件進行任何處理/並行處理,然後放下第一行以獲取RDD中的數據。 – lmm

回答

1

假設數據幀裝有頭和結構是平的:

val df = sqlContext. 
    read. 
    format("com.databricks.spark.csv"). 
    option("header", "true"). 
    load("data.csv") 

這樣的事情應該工作那麼你可以使用映射從自動分配到實際做同樣的事情。

+0

非常感謝您的回答!我會在接下來的幾天內嘗試。 – csnr

+0

爲什麼foldLeft而不是過濾? – dskrvk

+0

@dskrvk你的意思是類似'.filter(...)。foreach(col => df = df.drop(col))''?簡單來說,我猜想的是參照透明度和個人品味。 – zero323

相關問題