2016-08-02 170 views
5

我正在處理大火花DataFrame中的一列數字,我想創建一個新列,該列存儲該列中出現的唯一數字的彙總列表。有沒有辦法將限制參數傳遞給Spark中的functions.collect_set?

基本上正是functions.collect_set所做的。不過,我只需要聚合列表中最多1000個元素。有什麼辦法可以通過某種方式將參數傳遞給函數.collect_set()或者其他任何方式來獲取聚合列表中最多1000個元素,而不使用UDAF?

由於列太大,我希望避免收集所有元素並在之後修剪列表。

謝謝!

回答

1

使用採取

val firstThousand = rdd.take(1000) 

將返回第1000 收集也有可以提供過濾功能。這可以讓你對返回的內容更加具體。

+0

感謝您的回答。但是, 1)我只喜歡_distinct_值的列表。我看到有一個rdd.distinct(),但似乎沒有限制參數 2)不知道如何在collect中使用過濾器函數。我將如何使用過濾器來獲取一定數量的值? – user1500142

+0

此外,理想情況下我想避免使用rdds。我目前像df.groupBy()。agg( user1500142

1
scala> df.show 
    +---+-----+----+--------+ 
    | C0| C1| C2|  C3| 
    +---+-----+----+--------+ 
    | 10| Name|2016| Country| 
    | 11|Name1|2016|country1| 
    | 10| Name|2016| Country| 
    | 10| Name|2016| Country| 
    | 12|Name2|2017|Country2| 
    +---+-----+----+--------+ 

scala> df.groupBy("C1").agg(sum("C0")) 
res36: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res36.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
| Name|  30| 
+-----+-------+ 

scala> df.limit(2).groupBy("C1").agg(sum("C0")) 
    res33: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

    scala> res33.show 
    +-----+-------+ 
    | C1|sum(C0)| 
    +-----+-------+ 
    | Name|  10| 
    |Name1|  11| 
    +-----+-------+ 



    scala> df.groupBy("C1").agg(sum("C0")).limit(2) 
res2: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res2.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
+-----+-------+ 

scala> df.distinct 
res8: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res8.show 
+---+-----+----+--------+ 
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 10| Name|2016| Country| 
| 12|Name2|2017|Country2| 
+---+-----+----+--------+ 

scala> df.dropDuplicates(Array("c1")) 
res11: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res11.show 
+---+-----+----+--------+              
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 12|Name2|2017|Country2| 
| 10| Name|2016| Country| 
+---+-----+----+--------+ 
+0

感謝您的答案,但這並不完全符合我的要求。如果我想從一列中取得1000個不同的值,「df.limit(1000)」將對返回值的數量設置一個硬性上限,但是我可能會丟失不同的值,否則我應該添加其他值。 – user1500142

+0

你有兩種不同的方法,你可以在limit,groupby和agg方法之前執行dropDuplicates。 Distinct將查看所有列,droDuplicates允許您控制要比較哪些列以識別重複項。 @ user1500142 – mark

2

我正在使用collect_set和collect_list函數的修改副本;由於代碼範圍的原因,修改後的副本必須與原始文件位於相同的包路徑中。鏈接的代碼適用於Spark 2.1.0;如果您使用的是先前版本,方法簽名可能會有所不同。

拋出此文件(https://gist.github.com/lokkju/06323e88746c85b2ce4de3ea9cdef9bc)爲您的項目的src/main /組織/阿帕奇/火花/ SQL /催化劑/表達/ collect_limit.scala

使用它作爲:

import org.apache.spark.sql.catalyst.expression.collect_limit._ 
df.groupBy('set_col).agg(collect_set_limit('set_col,1000) 
3

我的解決辦法與Loki's answer with collect_set_limit非常相似。


我會使用一個UDF你想要什麼,會做後collect_set(或collect_list)或更難UDAF。

鑑於UDF的更多經驗,我會首先考慮。即使UDF沒有優化,對於這種用例也沒有問題。

val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) } 
val sample = spark.range(50).withColumn("key", $"id" % 5) 

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false) 
+---+--------------------------------------+ 
|key|all         | 
+---+--------------------------------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]| 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]| 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]| 
+---+--------------------------------------+ 

scala> sample. 
    groupBy("key"). 
    agg(collect_set("id") as "all"). 
    withColumn("limit(3)", limitUDF($"all", lit(3))). 
    show(false) 
+---+--------------------------------------+------------+ 
|key|all         |limit(3) | 
+---+--------------------------------------+------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] | 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] | 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] | 
+---+--------------------------------------+------------+ 

functions對象(udf功能的文檔)。

相關問題