2017-09-11 151 views
0

我有一個主要的SQL表,我正在讀入Spark並修改爲寫入CassandraDB。目前,我有一個將性別從0,1,2,3(整數)轉換爲「男性」,「女性」,「Trans」等(字符串)的工作實現。雖然下面的方法做工作,似乎非常低效,使那些映射到數據幀一個單獨的陣列,它加入到主表/數據幀,然後刪除,重命名等Scala/Apache Spark轉換DataFrame列值和類型,否則爲多個

我看到:

.withColumn("gender", when(col("gender) === 1, "male").otherwise("female") 

這將允許我繼續在主表上進行方法鏈接,但無法使其處理超過2個選項。有沒有辦法做到這一點?我在這個表上有大約10個不同的列,每個列都需要自己創建的自定義轉換。由於此代碼將處理數據TB,是否存在重複性更低且更有效的方法來完成此操作。感謝您提前提供任何幫助!

case class Gender(tmpid: Int, tmpgender: String) 

private def createGenderDf(spark:SparkSession): DataFrame = { 
    import spark.implicits._ 
    Seq(
    Gender(1, "Male"), 
    Gender(2, "Female"), 
    Gender(777, "Prefer not to answer") 
).toDF 
} 


private def createPersonsDf(spark: SparkSession): DataFrame = { 
    val genderDf = createGenderDf(spark) 
    genderDf.show() 

    val personsDf: DataFrame = spark.read 
    .format("csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("delimiter", "\t") 
    .load(dataPath + "people.csv") 
    .withColumnRenamed("ID", "id") 
    .withColumnRenamed("name_first", "firstname") 

    val personsDf1: DataFrame = personsDf 
    .join(genderDf, personsDf("gender") === genderDf("tmpid"), "leftouter") 

    val personsDf2: DataFrame = personsDf1 
    .drop("gender") 
    .drop("tmpid") 
    .withColumnRenamed("tmpgender", "gender") 
} 

回答

0

您可以使用嵌套when功能,將消除您創建genderDfjoindroprename等,至於你的榜樣的需要時,你可以做以下

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.StringType 
personsDf.withColumn("gender", when(col("gender") === 1, "male").otherwise(when(col("gender") ===2, "female").otherwise("Prefer not to answer")).cast(StringType)) 

您可以添加更多when函數在上面的嵌套結構中,並且您可以爲其他10列重複相同的操作。

+0

有沒有辦法做到這一點在性別列中已經存在(它被定義爲Int類型)與某種列類型轉換,或者我將不得不創建一個新的列類型字符串,然後有條件地設置值該列基於性別列?我想這第二個選項還需要刪除列,然後重新命名列... –

+0

withColumn將創建一個新的列,如果性別列不存在,它將替換列中的值,如果性別列已經存在。要更改withColumn api中列的數據類型,可以使用強制轉換。請看我更新的回答 –

+0

謝謝你的幫助!很好的解答,正是我所需要的 –

相關問題