2016-11-30 125 views
6

問題在標題中有很多:有沒有一種有效的方法來計算DataFrame中每列的不同值?Spark DataFrame:計算每列的不同值

describe方法只提供計數,但不提供不同的計數,我不知道是否有一種方法來獲得所有(或某些選定)列的不同計數。

+0

那麼,這取決於。如果你有一個真正的大集羣,你可以分割你的數據。之後,您可以創建一個可以計算每列的循環。這些數字將在paralallel工作。要說清楚,如果您擁有一個擁有1000名工作人員的羣集,則可以將數據分區爲200.每次可以統計5列。但你的問題不是那麼簡單。 –

回答

12

多個聚合是計算相當昂貴,所以我建議你使用近似重複計數:

val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3") 

val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap 
df.agg(exprs).show() 
// +---------------------------+---------------------------+---------------------------+ 
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)| 
// +---------------------------+---------------------------+---------------------------+ 
// |       2|       2|       3| 
// +---------------------------+---------------------------+---------------------------+ 

approx_count_distinct方法引擎蓋下依靠HyperLogLog

HyperLogLog算法及其變體HyperLogLog ++(Spark中實現)依賴於以下聰明觀察。

如果數字在一個範圍內均勻分佈,那麼不同元素的數量可以從數字的二進制表示中的前導零的最大數目近似。

例如,如果我們觀察到一個二進制數字的形式爲0…(k times)…01…1的數字,那麼我們可以估計該集合中有2^k個元素的順序。這是一個非常粗略的估計,但它可以通過繪製算法精確到極高的精度。

該算法背後的機制的詳細解釋可以在original paper中找到。

注:啓動星火1.6,當星火調用SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df各條款應引起分開的聚集每個條款。而這與我們聚合一次的SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df不同。因此,當使用count(distinct(_))approxCountDistinct(或approx_count_distinct)時,性能不會相當。

它的自火花1.6行爲的變化之一:

隨着改進的查詢規劃對於具有不同的聚合(火花9241)查詢,具有單個不同聚合具有一個查詢的俯視已被更改爲更強大的版本。要切換回由Spark 1.5的計劃程序生成的計劃,請將spark.sql.specializeSingleDistinctAggPlanning設置爲true。 (SPARK-12077)

參考:Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles

9

pySpark你可以做這樣的事情,使用countDistinct()

from pyspark.sql.functions import col, countDistinct 

df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)) 

同樣在Scala

import org.apache.spark.sql.functions.countDistinct 
import org.apache.spark.sql.functions.col 

df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*) 

如果你想準確的潛在損失,以加快速度,你可以也使用approxCountDistinct()

0

如果你只是想計算一個特定的列然後下面可以幫助。雖然它的答案很晚。它可能會幫助某人。 (pyspark 2.2.0測試)

from pyspark.sql.functions import col, countDistinct 
df.agg(countDistinct(col("colName")).alias("count")).show()