2017-02-08 16 views
0

爲了能夠使用我的DataFrame的列名而不轉義.我需要一個函數來「驗證」所有的列名 - 但我嘗試的方法都沒有在及時(我5分鐘後中止)。Scala Spark:性能問題重命名大量列

我正在嘗試我的算法的數據集是golub數據集(獲取它here)。這是一個具有7200列的2.2MB CSV文件。重命名所有列應該是秒

代碼讀取CSV在

var dfGolub = spark.read 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv("golub_merged.csv") 
    .drop("_c0") // drop the first column 
    .repartition(numOfCores) 

嘗試重命名列的問題:

def validifyColumnnames1(df : DataFrame) : DataFrame = { 
    import org.apache.spark.sql.functions.col 
    val cols = df.columns 
    val colsRenamed = cols.map(name => col(name).as(name.replaceAll("\\.",""))) 
    df.select(colsRenamed : _*) 
} 


def validifyColumnnames2[T](df : Dataset[T]) : DataFrame = { 
    val newColumnNames = ArrayBuffer[String]() 
    for(oldCol <- df.columns) { 
     newColumnNames += oldCol.replaceAll("\\.","") 
    } 
    df.toDF(newColumnNames : _*) 
} 

def validifyColumnnames3(df : DataFrame) : DataFrame = { 
    var newDf = df 
    for(col <- df.columns){ 
     newDf = newDf.withColumnRenamed(col,col.replaceAll("\\.","")) 
    } 
    newDf 
} 

任何想法是什麼原因造成這種性能問題?

設置:我在Ubuntu 16.04中local[24]模式的機器上運行星火2.1.0與16cores * 2個線程的RAM

+6

讀取沒有列名稱的數據爲RDD,然後只讀取作爲架構的列名稱。結合架構和RDD來獲得你的DF。 – toofrellik

回答

2

96GB和假設你知道的類型,你可以簡單地創建模式,而不是infering的它(推斷模式成本的性能,甚至可能是錯誤的csv)。

讓我們假設爲簡單起見,你有文件example.csv如下:事先可以

val scehma = StructType(Seq(StructField("A_B",StringType),StructField("A_C", IntegerType), StructField("AD", IntegerType))) 
val df = spark.read.option("header","true").schema(scehma).csv("example.csv") 
df.show() 

+---+---+---+ 
|A_B|A_C| AD| 
+---+---+---+ 
| a| 3| 1| 
+---+---+---+ 

如果你不知道的信息:

A.B, A.C, A.D 
a,3,1 

你可以做這樣的事情如前所述使用推理架構,那麼您可以使用數據框生成架構:

val fields = for { 
    x <- df.schema 
} yield StructField(x.name.replaceAll("\\.",""), x.dataType, x.nullable) 
val schema = StructType(fields) 

並重讀使用該架構的數據幀,如前所述