2016-10-03 245 views
0

我正在使用Spark數據幀。在我的數據框中有很多級別的分類變量。我試圖對這個變量進行一個簡單的轉換 - 只挑選大於n個觀察值的前幾個級別(比如1000)。將所有其他級別分成「其他」類別。轉換火花數據幀列

我對Spark很新,所以我一直在努力實現這一點。這是我迄今能夠實現的:

# Extract all levels having > 1000 observations (df is the dataframe name) 
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count")) 

# Extract the level names 
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect 

這給了我一個數組,其中有我想保留的級別名稱。接下來,我應該定義可應用於列的轉換函數。這是我陷入困境的地方。我相信我們需要創建一個用戶定義的功能。這是我的嘗試:

# Define UDF 
val var_transform = udf((x: String) => { 
    if (level_names contains x) x 
    else "others" 
}) 

# Apply UDF to the column 
val df_new = df.withColumn("Var_new", var_transform($"Col_name")) 

然而,當我嘗試df_new.show它拋出一個「任務不序列化」異常。我究竟做錯了什麼?另外,有沒有更好的方法來做到這一點?

謝謝!

+0

讓我知道level_count.select(「Col_name」)。rdd.map(x => x(0))的輸出。collect –

+0

@ArunakiranNulu一系列具有我希望保留的值的值 – Dataminer

回答

1

這裏是會是這樣,我認爲一個解決方案,對於這樣一個簡單的轉換更好:堅持以優化數據框API和信任催化劑和鎢(例如制定廣播加入):

val levels_count = df 
    .groupBy($"Col_name".as("new_col_name")) 
    .count 
    .filter("count >10000") 

val df_new = df 
    .join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter") 
    .drop("Col_name") 
    .withColumn("new_col_name",coalesce($"new_col_name", lit("other"))) 
+0

這使我類型不匹配錯誤.... found:String(「other」) 必需:org.apache.spark.sql.Column – Dataminer

+0

哈,對不起,我在黑暗中寫道它。在這種情況下,您可以使用org.apache.spark.sql.functions.lit函數從文字創建列。 – Wilmerton

+0

在joinType上進行第二次編輯: - | – Wilmerton