2017-09-12 64 views
0

在分析的數據縮減階段,我想要刪除列總數低於所有列總計中值的所有列。 因此,與數據集:scala:刪除列值低於所有列的中值的列

v1,v2,v3 
1 3 5 
3 4 3 

我之列

v1,v2,v3 
4 7 8 

中值是7,所以我放下V1

v2,v3 
3 5 
4 3 

我以爲我可以與排流功能做到這一點。但這似乎不可能。

代碼我已經拿出了作品,但它看起來很冗長,看起來很像Java代碼(我認爲這是我做錯了的一個標誌)。

是否有任何更有效的方法來執行此操作?

val val dfv2=DataFrameUtils.openFile(spark,"C:\\Users\\jake\\__workspace\\R\\datafiles\\ikodaDataManipulation\\VERB2.csv") 

    //return a single row dataframe with sum of each column 
    val dfv2summed:DataFrame=dfv2.groupBy().sum() 

    logger.info(s"dfv2summed col count is ${dfv2summed.schema.fieldNames.length}") 


    //get the rowValues 
    val rowValues:Array[Long]=dfv2summed.head().getValuesMap(dfv2summed.schema.fieldNames).values.toArray 

    //sort the rows 
    scala.util.Sorting.quickSort(rowValues) 

    //calculate medians (simplistically) 
    val median:Long = rowValues(rowValues.length/2) 

    //ArrayBuffer to hold column needs that need removing 
    var columnArray: ArrayBuffer[String] = ArrayBuffer[String]() 

    //get tuple key value pairs of columnName/value 
    val entries: Map[String, Long]=dfv2summed.head().getValuesMap(dfv2summed.schema.fieldNames) 
    entries.foreach 
    { 

    //find all columns where total value below median value 
    kv => 

     if(kv._2.<(median)) 
     { 
      columnArray+=kv._1 
     } 
    } 

    //drop columns 
    val dropColumns:Seq[String]=columnArray.map(s => s.substring(s.indexOf("sum(")+4,s.length-1)).toSeq 
    logger.info(s"todrop ${dropColumns.size} : ${dropColumns}") 
    val reducedDf=dfv2.drop(dropColumns: _*) 
    logger.info(s"reducedDf col count is ${reducedDf.schema.fieldNames.length}") 
+0

請分享數據。例如,做預期輸出 – mtoto

+0

。請參閱 – Jake

回答

1

計算Spark每一列的總和之後,我們可以得到純Scala的中值,然後只選擇大於或等於列索引這個值的列。

讓我們先從定義計算的中線的功能,它是this example稍作修改:

def median(seq: Seq[Long]): Long = { 
    //In order if you are not sure that 'seq' is sorted 
    val sortedSeq = seq.sortWith(_ < _) 

    if (seq.size % 2 == 1) sortedSeq(sortedSeq.size/2) 
    else { 
    val (up, down) = sortedSeq.splitAt(seq.size/2) 
    (up.last + down.head)/2 
    } 
} 

我們首先計算出所有列的款項並將其轉換爲Seq[Long]

import org.apache.spark.sql.functions._ 
val sums = df.select(df.columns.map(c => sum(col(c)).alias(c)): _*) 
      .first.toSeq.asInstanceOf[Seq[Long]] 

然後我們計算出median,

val med = median(sums) 

而且使用它作爲一個門檻,以產生列指數以保持:

val cols_keep = sums.zipWithIndex.filter(_._1 >= med).map(_._2) 

最後,我們一個select()語句內這些指數映射:

df.select(cols_keep map df.columns map col: _*).show() 
+---+---+ 
| v2| v3| 
+---+---+ 
| 3| 5| 
| 4| 3| 
+---+---+ 
+0

以上的插入非常有幫助。非常感謝你 – Jake

+0

注意:需要導入org.apache.spark.sql.functions._ – Jake