0

我有一個39列的數據幀,每列都有不同的正常範圍。 通過使用正常範圍,我想找出正常值,並把0否則把1.使用scala數據幀中的最小值和最大值來尋找正常值

這是我所做的,但我想爲39列做。

val test :(Double => Double) = (value: Double) => 
{ 
    if(value >= 45 && value <= 62) 0 
    else 1 
} 

但我不明白如何使用不同的值到每一列。

用於離: 我有這樣的DF

+--------------------+---------+-------------------------+---------+ 
|a     |b  |c      |d  | 
+--------------------+---------+-------------------------+---------+ 
|    207.0|  40.0|     193.0|  39.0| 
|    98.0|  17.0|     193.0|  15.0| 
|    207.0|  13.0|     193.0|  17.0| 
|    207.0|  26.0|     193.0|  23.0| 
|    207.0|  35.0|     193.0|  24.0| 
|    207.0|  91.0|     193.0|  45.0| 
|    207.0|  40.0|     193.0|  37.0| 
|    207.0|  23.0|     193.0|  23.0| 
|    207.0|  26.0|     193.0|  22.0| 
|    207.0|  39.0|     193.0|  34.0| 

我想導致像下面使用範圍

col range 
a 50-160 
b 1-21 
c 5-40 
d 7-27 

如果在範圍內的值,則0否則爲1

+--------------------+---------+-------------------------+---------+ 
|a     |b  |c      |d  | 
+--------------------+---------+-------------------------+---------+ 
|     1.0|  1.0|      1.0|  1.0| 
|     0.0|  0.0|      1.0|  0.0| 
|     1.0|  0.0|      1.0|  0.0| 
|     1.0|  1.0|      1.0|  0.0| 
|     1.0|  1.0|      1.0|  0.0| 
|     1.0|  1.0|      1.0|  1.0| 
|     1.0|  1.0|      1.0|  1.0| 
|     1.0|  1.0|      1.0|  0.0| 
|     1.0|  1.0|      1.0|  0.0| 
|     1.0|  1.0|      1.0|  1.0| 

I want to do this for 39 columns.(scala/pyspark preferred) 

回答

1

您應該定義一個用戶定義的函數(UDF),然後將其應用於您的每個列NT。

這是關於Scala的用戶定義函數的文檔。它非常完整,我鼓勵你閱讀它。

下面是摘錄,以幫助您快速瞭解,我想在這裏去:

scala> df.withColumn("upper", upper('text)).show 
+---+-----+-----+ 
| id| text|upper| 
+---+-----+-----+ 
| 0|hello|HELLO| 
| 1|world|WORLD| 
+---+-----+-----+ 

// You could have also defined the UDF this way 
val upperUDF = udf { s: String => s.toUpperCase } 

// or even this way 
val upperUDF = udf[String, String](_.toUpperCase) 

scala> df.withColumn("upper", upperUDF('text)).show 
+---+-----+-----+ 
| id| text|upper| 
+---+-----+-----+ 
| 0|hello|HELLO| 
| 1|world|WORLD| 
+---+-----+-----+ 

你看你的功能適用於整列,其結果將是一個新的欄目。因此,你的函數應該是這樣的:

def isInRange(e: Number, min: Number, max: Number): Boolean = (e < max && e > min) 

然後,對於給定的minValue(最小值)和maxValue(最大值),你要做的就是:

myDF.withColumn("isInRange_a", udf(x => isInRange(x, minValue, maxValue).apply(myDF("a"))) 

你現在能做些什麼,在給定的名單套用/數據幀包含(varName中,包括maxValue,minValue(最小值))是:

  • 任一個地圖/減少操作,在這裏將計算爲每列不管它是在給定範圍或沒有。然後,你會加入一個給定的密鑰(我不知道你的問題很多,所以我不能幫你在這裏)。這個解決方案可以工作,但隨着數據的增長,效率會變得非常低,因爲您可能擁有幾個相似的鍵。

  • 無論是遞歸的操作,其目的是執行類似:myDF.whithColumn(...).withColumn(...).withColumn(...)

第二種解決方案是一個我會因爲這會看起來很像鍵的選擇。

你是怎麼做到的?

def applyMyUDFRecursively(myDF: DataFrame, List[MyRange]: rangesList): DataFrame = 
if (rangesList == null || rangesList.isEmpty) myDF 
else applyMyUDFRecursively(
    myDF.withColumn(myDF.withColumn("isInRange_" + rangesList.head._0, udf(x => isInRange(x, rangesList.head._1, rangesList.head._2).apply(myDF(rangesList.head._0))), rangesList.tail) 

現在,您已應用於所有列,但列可能太多。做這樣的事情:

resultDF.drop(rangesList.map(case x => x._0).collect: _*) 

公告類型歸屬到降功能適用於所有的元素時,地圖/收集

與VAL MyRange獲得的名單內= SEQ(varName中:字符串,最小:數字, max:Number)

例如:爲您的數據幀,它應該像這樣(簡化版本):

def recApply(myDF: DataFrame, cols: List[String]): DataFrame = 
if (cols == null || cols.isEmpty) myDF 
else recApply(myDF.withColumn(myDF.withColumn("isInRange_" + col.head, udf(x => test(x).apply(myDF(cols.head))), cols.tail) 

然後,應用此功能,您的DF和存儲您的結果:

val my_result = recApply(myDF, myDF.cols) 
+1

請讓我知道,如果事情還不清楚,我希望我給你鑰匙,讓你現在自己處理這個問題,並毫不猶豫地將問題標記爲答案,如果這適合你 – belka

+0

我很感謝你的回答這是最詳細的答案我曾經得到但仍然沒有工作所有39欄可以只顯示上面的示例數據,它會更有幫助 –

+0

也,你可以看到我的udf以上 –

相關問題