2017-03-28 107 views
1

假設,我有一個數據幀象下面這樣:如何根據spark數據框中的某些列過濾掉重複的行?

enter image description here

在這裏,你可以看到,事務處理編號1,2和3具有相同的值用於列A,B,C,但對於列d不同的值和E. E列有日期條目。

  1. 對於相同的A,B和C組合(A = 1,B = 1,C = 1),我們有3行。基於E列的最近交易日期,我想只取一行表示具有最近日期的行。但對於最近的日期,有2個交易。但是,如果在A列中找到A,B,C和最近日期的相同組合的兩個或更多行,我想只取其中一個。 因此,我對該組合的預期產出將爲行編號3或4 (任何人都會這樣做)。
  2. 對於相同的A,B和C組合(A = 2,B = 2,C = 2),我們有2行。但是根據列E,最近的日期是行號5的日期。因此,我們將這一行作爲A,B和C的這種組合。 因此,我對該組合的預期輸出將是行號

所以最終的輸出將是(3和5)或(4和5)

現在我應該怎麼處理方法:

  1. 我這樣說的:

兩個reduceByKey和groupByKey可用於同一目的,但 reduceByKey工作在一個大的要好得多數據集。這是因爲在 混洗數據之前,Spark 知道它可以將輸出與每個分區上的公用密鑰組合在一起。

  • 我試圖與GROUPBY上的列A,B,C和最大上柱E.但它不能給我的行的頭,如果存在的多行同一日期。
  • 什麼是最優化的方法來解決這個問題?提前致謝。

    編輯:我需要找回我過濾的交易。如何做到這一點呢?

    +0

    什麼版本的火花您使用的是? – eliasah

    +0

    方法2更好。函數「max」總是會返回一個最大日期。如果存在多個這樣的日期,則只會選擇一個。 – pasha701

    +0

    我正在使用spark-2.1.0版本 –

    回答

    0

    我已經使用spark window functions讓我的解決方案:

    val window = Window 
         .partitionBy(dataframe("A"), dataframe("B"),dataframe("C")) 
         .orderBy(dataframe("E") desc) 
    
    val dfWithRowNumber = dataframe.withColumn("row_number", row_number() over window) 
    val filteredDf = dfWithRowNumber.filter(dfWithRowNumber("row_number") === 1) 
    
    +0

    這是保證做全面洗牌。您可能希望將它與實現相同的pairRDD/reduceByKey選項進行比較。 – sourabh

    0

    鏈接可能通過幾個步驟。 Agregated數據框中:

    val agregatedDF=initialDF.select("A","B","C","E").groupBy("A","B","C").agg(max("E").as("E_max")) 
    

    鏈接INTIAL-agregated:

    initialDF.join(agregatedDF, List("A","B","C")) 
    

    如果初始數據幀來自蜂巢,都可以簡化。

    0
    val initialDF = Seq((1,1,1,1,"2/28/2017 0:00"),(1,1,1,2,"3/1/2017 0:00"), 
    (1,1,1,3,"3/1/2017 0:00"),(2,2,2,1,"2/28/2017 0:00"),(2,2,2,2,"2/25/20170:00")) 
    

    這將錯過相應的山坳(d)

    initialDF 
    .toDS.groupBy("_1","_2","_3") 
    .agg(max(col("_5"))).show 
    

    如果你想爲最大山坳對應的冷:

    initialDF.toDS.map(x=>x._1,x._2,x._3,x._5,x._4))).groupBy("_1","_2","_3") 
    .agg(max(col("_4")).as("_4")).select(col("_1"),col("_2"),col("_3"),col("_4._2"),col("_4._1")).show 
    

    對於ReduceByKey你可以轉換數據集以配對RDD,然後解決它。如果Catalyst無法優化第一個組中的groupByKey,應該更快。請參閱Rolling your own reduceByKey in Spark Dataset

    相關問題