-1

我有一個數據幀DF如下所述:獲取星火重複的記錄

**customers** **product** **val_id** **rule_name** **rule_id** **priority** 
    1    A   1   ABC   123   1 
    3    Z   r   ERF   789   2 
    2    B   X   ABC   123   2 
    2    B   X   DEF   456   3 
    1    A   1   DEF   456   2  

我想創建一個新的數據幀DF2,這將只有唯一客戶ID,但作爲RULE_NAMERULE_ID列在數據同一客戶不同的,所以我要選擇哪些具有最高優先級相同客戶那些記錄,所以我的最終結果應該是:

**customers** **product** **val_id** **rule_name** **rule_id** **priority** 
     1    A   1   ABC   123   1 
     3    Z   r   ERF   789   2 
     2    B   X   ABC   123   2 

任何人都可以請幫助我使用Spark Scala實現它。任何幫助都會被矇蔽。

回答

3

你基本上想要選擇列中具有極端值的行。這是一個非常常見的問題,所以甚至有一個標籤。也看到這個問題SQL Select only rows with Max Value on a Column,它有一個很好的答案。

下面是您的具體情況的一個例子。

請注意,這可以爲客戶選擇多行,如果爲,那麼該客戶有多個具有相同(最小)優先級值的行。

這個例子是在pyspark,但它應該是簡單的翻譯到Scala

# find best priority for each customer. this DF has only two columns. 
cusPriDF = df.groupBy("customers").agg(F.min(df["priority"]).alias("priority")) 
# now join back to choose only those rows and get all columns back 
bestRowsDF = df.join(cusPriDF, on=["customers","priority"], how="inner") 
0

要通過優先創建DF2你必須首先爲了DF,然後找到獨特的客戶id。就像這樣:

val columns = df.schema.map(_.name).filterNot(_ == "customers").map(col => first(col).as(col)) 

val df2 = df.orderBy("priority").groupBy("customers").agg(columns.head, columns.tail:_*).show 

它會給你期望的輸出:

+----------+--------+-------+----------+--------+---------+ 
| customers| product| val_id| rule_name| rule_id| priority| 
+----------+--------+-------+----------+--------+---------+ 
|   1|  A|  1|  ABC|  123|  1| 
|   3|  Z|  r|  ERF|  789|  2| 
|   2|  B|  X|  ABC|  123|  2| 
+----------+--------+-------+----------+--------+---------+ 
0

科裏打我給它,但這裏的斯卡拉版本:

val df = Seq(
(1,"A","1","ABC",123,1), 
(3,"Z","r","ERF",789,2), 
(2,"B","X","ABC",123,2), 
(2,"B","X","DEF",456,3), 
(1,"A","1","DEF",456,2)).toDF("customers","product","val_id","rule_name","rule_id","priority") 
val priorities = df.groupBy("customers").agg(min(df.col("priority")).alias("priority")) 
val top_rows = df.join(priorities, Seq("customers","priority"), "inner") 

+---------+--------+-------+------+---------+-------+ 
|customers|priority|product|val_id|rule_name|rule_id| 
+---------+--------+-------+------+---------+-------+ 
|  1|  1|  A|  1|  ABC| 123| 
|  3|  2|  Z|  r|  ERF| 789| 
|  2|  2|  B|  X|  ABC| 123| 
+---------+--------+-------+------+---------+-------+ 
0

你將不得不使用minaggregationprioritygroupingdataframecustomers然後inner join th e original dataframeaggregated dataframeselect所需的列。

val aggregatedDF = dataframe.groupBy("customers").agg(max("priority").as("priority_1")) 
     .withColumnRenamed("customers", "customers_1") 

    val finalDF = dataframe.join(aggregatedDF, dataframe("customers") === aggregatedDF("customers_1") && dataframe("priority") === aggregatedDF("priority_1")) 
    finalDF.select("customers", "product", "val_id", "rule_name", "rule_id", "priority").show 

你應該有希望的結果