2017-09-12 39 views
4

我需要按「KEY」列進行分組,並且需要檢查「TYPE_CODE」列是否同時具有「PL」和「JL」值,如果是這樣,那麼我需要添加指標列如 「Y」 其他 「N」Spark Dataframe Group有新指標列

實施例:

//Input Values 
    val values = List(List("66","PL") , 
    List("67","JL") , List("67","PL"),List("67","PO"), 
    List("68","JL"),List("68","PO")).map(x =>(x(0), x(1))) 

    import spark.implicits._ 
    //created a dataframe 
    val cmc = values.toDF("KEY","TYPE_CODE") 

    cmc.show(false) 
    ------------------------ 
    KEY |TYPE_CODE | 
    ------------------------ 
    66 |PL | 
    67 |JL | 
    67 |PL | 
    67 |PO | 
    68 |JL | 
    68 |PO | 
    ------------------------- 

預期輸出:

對於每個 「KEY」,如果它有 「類型代碼」 既有PL & JL,則Y 別的ñ

----------------------------------------------------- 
    KEY |TYPE_CODE | Indicator 
    ----------------------------------------------------- 
    66 |PL   | N 
    67 |JL   | Y 
    67 |PL   | Y 
    67 |PO   | Y 
    68 |JL   | N 
    68 |PO   | N 
    --------------------------------------------------- 

例如, 67具有兩個PL & JL - 因此, 「Y」 66具有隻PL - 所以 「N」 68具有僅JL - 所以 「N」

+0

我不知道spark的groupby支持這種聚合。您可能必須創建自己的功能才能這樣做 – Sentinel

回答

2

一個選項:

1)收集TYPE_CODE作爲列表;

2)檢查它是否包含特定的字符串;

3)然後壓平與explode列表:

(cmc.groupBy("KEY") 
    .agg(collect_list("TYPE_CODE").as("TYPE_CODE")) 
    .withColumn("Indicator", 
     when(array_contains($"TYPE_CODE", "PL") && array_contains($"TYPE_CODE", "JL"), "Y").otherwise("N")) 
    .withColumn("TYPE_CODE", explode($"TYPE_CODE"))).show 
+---+---------+---------+ 
|KEY|TYPE_CODE|Indicator| 
+---+---------+---------+ 
| 68|  JL|  N| 
| 68|  PO|  N|  
| 67|  JL|  Y| 
| 67|  PL|  Y| 
| 67|  PO|  Y| 
| 66|  PL|  N| 
+---+---------+---------+ 
4

另一種選擇:

  1. 組由KEY並使用agg創建兩個單獨的指示符列(一個用於JL和關於PL ),然後計算組合指標

  2. join與原來的數據幀

共有:

val indicators = cmc.groupBy("KEY").agg(
    sum(when($"TYPE_CODE" === "PL", 1).otherwise(0)) as "pls", 
    sum(when($"TYPE_CODE" === "JL", 1).otherwise(0)) as "jls" 
).withColumn("Indicator", when($"pls" > 0 && $"jls" > 0, "Y").otherwise("N")) 

val result = cmc.join(indicators, "KEY") 
    .select("KEY", "TYPE_CODE", "Indicator") 

這可能是比@ Psidom的回答慢,但可能是安全 - collect_list如果你有比賽的一個巨大的數字可能會出現問題一個特定的鍵(該列表必須存儲在單個工作人員的存儲器中)。

EDIT

在情況下,輸入被稱爲是獨特(即JL/PL只會出現每按鍵一次,至多),indicators可以創建使用簡單count聚合,這是(可以說)更容易閱讀:

val indicators = cmc 
    .where($"TYPE_CODE".isin("PL", "JL")) 
    .groupBy("KEY").count() 
    .withColumn("Indicator", when($"count" === 2, "Y").otherwise("N"))