2017-07-31 104 views
3

我有一個DataFrame,DataFrame hava兩列'value'和'timestamp','timestmp'是有序的,我想得到DataFrame的最後一行,我該怎麼辦?如何從DataFrame獲取最後一行?

這是我輸入:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 1|  1| 
| 4|  2| 
| 3|  3| 
| 2|  4| 
| 5|  5| 
| 7|  6| 
| 3|  7| 
| 5|  8| 
| 4|  9| 
| 18|  10| 
+-----+---------+ 

這是我的代碼:

val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10)) 
    var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp") 

這是我預期的結果:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 
+0

請問'df.where($ 「時間戳」 === MAX($ 「時間戳」)'工作? –

+0

它亙古不變的工作交流rangepartitioning(TS# 7 ASC NULLS FIRST,200) – mentongwu

回答

3

我想簡單地reduce

df.reduce { (x, y) => 
    if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y 
} 
1

如果timestamp列是獨一無二的,是遞增順序然後有以下方法得到最後一行

println(df.sort($"timestamp", $"timestamp".desc).first()) 

// Output [1,1] 

df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println) 

// Output [1,1] 

df.where($"timestamp" === df.count()).show 

輸出:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 

如果沒有創建索引的新列並選擇最後一個指標如下

val df1 = spark.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map { 
    case (row, index) => Row.fromSeq(row.toSeq :+ index) 
}, 
StructType(df.schema.fields :+ StructField("index", LongType, false))) 

df1.where($"timestamp" === df.count()).drop("index").show 

輸出:

+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 
+0

排序功能效率低下,我不想使用排序功能 – mentongwu

+0

比你可以使用df.where($「timestamp」=== df.count()) –

1

最有效的方法是到你的DataFrame中reduce。這給你一個你可以轉換回DataFrame的單行,但由於它只包含1條記錄,所以這沒什麼意義。

sparkContext.parallelize(
    Seq(
    df.reduce { 
    (a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b 
    } match {case Row(value:Int,timestamp:Int) => (value,timestamp)} 
) 
) 
.toDF("value","timestamp") 
.show 


+-----+---------+ 
|value|timestamp| 
+-----+---------+ 
| 18|  10| 
+-----+---------+ 

效率較低(因爲它需要改組)雖然短是這樣的解決方案:

df 
.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head) 
0

是 我會簡單地使用查詢 - 訂單表格由降序排列 - 來自這需要1個值爲了

df.createOrReplaceTempView("table_df") 
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1""" 
latest_rec = self.sqlContext.sql(query_latest_rec) 
latest_rec.show()