2017-05-19 151 views
0

相關獲得列值比方說,我有三列的數據幀:在星火1.6 /斯卡拉,與骨料

itemid, date, price 
1, 2017-05-18, $1.10 
2, 2017-05-18, $2.20 
1, 2017-04-12, $0.90 
1, 2017-03-29, $1.00 

現在,我想按的itemid,得到的最早日期,並獲得價格匹配最早的日期。 (我們可以假設(的itemid,日期)是唯一的)

的輸入輸出上面會:

1, 2017-03-29, $1.00 
2, 2017-05-18, $2.20 

在SQL中,我可以用做自聯接 - 第一選擇每個itemid的最短日期,然後選擇日期與最低日期匹配的價格和日期。

我該如何在Scala Spark DataFrame中表達這一點? 如果答案仍然涉及到自連接,那麼Spark 1.6中的DataFrame查詢執行器是否足夠聰明,不足以實際實現連接?

回答

1

一種方法是使用類似於以下SparkSQL窗函數:

import org.apache.spark.sql.expressions.Window 

val df = Seq(
    (1, "2017-05-18", 1.10), 
    (2, "2017-05-18", 2.20), 
    (1, "2017-04-12", 0.90), 
    (1, "2017-03-29", 1.00) 
).toDF(
    "itemid", "date", "price" 
).as[(Integer, String, Double)] 

// Add earliest date by itemid via window function and 
// keep only rows with earliest date by itemid 
val df2 = df.withColumn("earliestDate", min("date").over(
    Window.partitionBy("itemid") 
)). 
    where($"date" === $"earliestDate") 

df2.show 
+------+----------+-----+------------+ 
|itemid|  date|price|earliestDate| 
+------+----------+-----+------------+ 
|  1|2017-03-29| 1.0| 2017-03-29| 
|  2|2017-05-18| 2.2| 2017-05-18| 
+------+----------+-----+------------+ 
+0

謝謝你的解決方案。事實證明,這與自連接方法有點類似 - 因爲(itemid,date)已經是一個有保證的唯一鍵,我可以使用標準聚合計算最小价格換ID,然後重新加入。我不是使用行ID,而是使用唯一鍵,而不是使用窗口,我可以使用groupBy()。 –

+0

@Jon Watte,是的,使用groupBy和self-join(itemid,date)是唯一的,不需要創建一個唯一的列。事實上,在這種簡單的情況下,如果使用窗口函數,則不需要自連接(因此肯定不需要創建唯一的rowid)。我已經更新了我的答案。 –

+0

感謝您的澄清,並給出正確答案! –