2017-10-06 31 views
0

我有數據框。我需要根據每個Id的updateTableTimestamp表中最新的記錄。 df.show()如何使用上次時間戳從數據框中選擇不同的記錄

+--------------------+-----+-----+--------------------+ 
|   Description| Name| id |updateTableTimestamp| 
+--------------------+-----+-----+--------------------+ 
|     | 042F|64185|  1507306990753| 
|     | 042F|64185|  1507306990759| 
|Testing    |042MF| 941|  1507306990753| 
|     | 058F| 8770|  1507306990753| 
|Testing 3   |083MF|31663|  1507306990759| 
|Testing 2   |083MF|31663|  1507306990753| 
+--------------------+-----+-----+--------------------+ 

需要輸出

+--------------------+-----+-----+--------------------+ 
|   Description| Name| id |updateTableTimestamp| 
+--------------------+-----+-----+--------------------+ 
|     | 042F|64185|  1507306990759| 
|Testing    |042MF| 941|  1507306990753| 
|     | 058F| 8770|  1507306990753| 
|Testing 3   |083MF|31663|  1507306990759| 
+--------------------+-----+-----+--------------------+ 

我已經試過

sqlContext.sql("SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY Id ORDER BY updateTableTimestamp DESC) rank from temptable) tmp where rank = 1") 

它給出了分區錯誤。在線程異常 「主」 java.lang.RuntimeException: [1.29] failure: ``union'' expected but(」 found`I現在用火花1.6.2

+1

「它給錯誤」 - 錯誤是什麼? – FuzzyTree

+1

嘗試'where tmp.rank = 1'或嘗試使用與'rank'不同的別名,因爲它是保留字。 – Simon

+0

不支持PARTITION – lucy

回答

0
import org.apache.spark.sql.functions.first 
import org.apache.spark.sql.functions.desc 
import org.apache.spark.sql.functions.col 

val dfOrder = df.orderBy(col("id"), col("updateTableTimestamp").desc) 

val dfMax = dfOrder.groupBy(col("id")). 
      agg(first("description").as("description"), 
       first("name").as("name"), 
       first("updateTableTimestamp").as("updateTableTimestamp")) 
     dfMax.show 

enter image description here

在此之後,如果你想重新整理你的領域,只是應用塞萊對您的新DF進行編號功能。

+0

太棒了!埃裏克巴拉哈斯謝謝 – lucy

0

選擇 說明,姓名,身份證,updateTableTimestamp 從table_name的 其中id在 (從TABLE_NAME組由updateTableTimestamp選擇ID),以便通過updateTableTimestamp遞減;

相關問題