2017-03-07 32 views
1

我在Scala中的DataFrame中存在將值置換的問題。我最初的DataFrame看起來是這樣的:Spark:將UDF應用於數據框根據DF中的值生成新列

+----+----+----+----+ 
|col1|col2|col3|col4| 
+----+----+----+----+ 
| A| X| 6|null| 
| B| Z|null| 5| 
| C| Y| 4|null| 
+----+----+----+----+ 

col1col2String類型和col3col4Int

而結果應該是這樣的:

+----+----+----+----+------+------+------+ 
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4| 
+----+----+----+----+------+------+------+ 
| A| X| 6|null|  6| null| null| 
| B| Z|null| 5| null|  5| null| 
| C| Y| 4| 4| null| null|  4| 
+----+----+----+----+------+------+------+ 

這意味着三個新列後應col1col2並提取值的列被命名。提取的值來自列col2,col3col5,取決於哪個值不是null

那麼如何實現呢?我首先想到的一個UDF這樣的:

def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = { 
    if col3 == null{ 
     val rowValue=col4; 
     val newColumn=col1+col2+"col4"; 
    } else{ 
     val rowValue=col3; 
     val newColumn=col1+col2+"col3"; 
    } 
    return (newColumn, rowValue); 
} 

val udfMyFunc = udf(myFunc _) //needed to treat it as partially applied function 

但我怎麼能以正確的方式把它從數據幀?

當然,上面的所有代碼都是垃圾,可能有更好的方法。因爲我只是玩雜耍的第一個代碼片段讓我知道...比較Int值到null已不起作用。

任何幫助表示讚賞!謝謝!

+0

[Apache Spark - 將UDF的結果分配給多個數據幀列]可能的重複(http://stackoverflow.com/questions/35322764/apache-spark-assign-the-result-of-udf-to-多個數據幀列) – jwvh

回答

0

好的,我有一個解決方法來實現我想要的。我執行以下操作:

(1)I生成包含的元組的新列與[newColumnName,rowValue]以下這個建議Derive multiple columns from a single column in a Spark DataFrame

case class toTuple(newColumnName: String, rowValue: String) 

def createTuple (input1:String, input2:String) : toTuple = { 
    //do something fancy here 
    var column:String= input1 + input2 
    var value:String= input1   
    return toTuple(column, value) 
} 

val UdfCreateTuple = udf(createTuple _) 

(2)應用功能DataFrame

dfNew= df.select($"*", UdfCreateTuple($"col1",$"col2").alias("tmpCol") 

(3)創建具有不同值的數組newColumnName

val dfDistinct = dfNew.select($"tmpCol.newColumnName").distinct 

(4)創建具有不同值

var a = dfDistinct.select($"newCol").rdd.map(r => r(0).asInstanceOf[String]) 

var arrDistinct = a.map(a => a).collect() 

(5)創建密鑰值映射

var seqMapping:Seq[(String,String)]=Seq() 
for (i <- arrDistinct){ 
    seqMapping :+= (i,i) 
} 

(6)應用映射原始數據幀,比照一個數組Mapping a value into a specific column based on annother column

val exprsDistinct = seqMapping.map { case (key, target) => 
    when($"tmpCol.newColumnName" === key, $"tmpCol.rowValue").alias(target) } 

val dfFinal = dfNew.select($"*" +: exprsDistinct: _*) 

嗯,這是有點麻煩,但我可以得到一組新的列不知道有多少,並在同一時間的價值轉移到新的列。

評論贊賞!也許有更快的方法?

最佳,肯

+0

嗨,肯,我回答你的問題,遲到比我猜不到! – LucieCBurgess

1

我正好與我自己的數據框同樣的問題,所以我想我會分享答案(即使它是你問:-)後10個月,我碰到你的問題,我想答案可能對其他人有幫助。有一個簡單的方法:

val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1 
      .withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2 
      .groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3 
      .pivot("newCol") // Step 4 
      .agg(max($"value")) // Step 5 
      .orderBy($"newCol") // Step 6 
      .drop($"newCol") // Step 7 

     df3.show() 

步驟如下工作:

  1. 添加含有COL1與COL2
  2. //添加新列,「值鏈接的內容的新列「,其中包含col3或col4的非空內容
  3. GroupBy所需的列
  4. 在newCol上轉動,其中包含現在爲列標題的值
  5. 按值的最大值進行聚合,如果groupBy是每個組的單值,則該值將成爲值本身;或可替代.agg(first($"value"))如果值恰好是一個字符串,而不是一個數值類型 - 最大功能只能通過NEWCOL應用於數字類型
  6. 爲了使DF是按升序排列
  7. 降本欄目爲你不再需要它,或者跳過這一步,如果你想要一列沒有空值的數值

由於@ user8371915的幫助,我首先回答了我自己的關鍵問題。

結果如下:

+----+----+----+----+----+----+----+ 
|col1|col2|col3|col4| AX| BZ| CY| 
+----+----+----+----+----+----+----+ 
| A| X| 6|null| 6|null|null| 
| B| Z|null| 5|null| 5|null| 
| C| Y| 4| 4|null|null| 4| 
+----+----+----+----+----+----+----+ 

您可能需要玩的列標題字符串連接來獲得正確的結果。

+1

@ user8371915,很好的編輯! :-) – LucieCBurgess

相關問題