8
我有Foo類的RDD:class Foo(name : String, createDate : Date)
。 我想要一個其他的RDD,舊版本號爲Foo
。 我的第一個想法是排序createDate
和0.1 *計數限制,但沒有限制功能。如何對Spark進行排序和限制?
你有什麼想法嗎?
我有Foo類的RDD:class Foo(name : String, createDate : Date)
。 我想要一個其他的RDD,舊版本號爲Foo
。 我的第一個想法是排序createDate
和0.1 *計數限制,但沒有限制功能。如何對Spark進行排序和限制?
你有什麼想法嗎?
假設Foo
是的情況下,類這樣的:
import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
使用普通RDDS:
import org.apache.spark.rdd.RDD
import scala.math.Ordering
val rdd: RDD[Foo] = sc
.parallelize(Seq(
("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
.toDF("name", "createDate")
.withColumn("createDate", $"createDate".cast("date"))
.as[Foo].rdd
rdd.cache()
val n = scala.math.ceil(0.1 * rdd.count).toInt
數據裝配到驅動器的存儲器:
和分數比較小,你想
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime))
// Array[Foo] = Array(Foo(a,2009-11-23))
比例是比較大的:
rdd.sortBy(_.createDate.getTime).take(n)
否則
rdd
.sortBy(_.createDate.getTime)
.zipWithIndex
.filter{case (_, idx) => idx < n}
.keys
使用DataFrame(注意 - 由於極限行爲,這實際上並不是最佳性能)。
import org.apache.spark.sql.Row
val topN = rdd.toDF.orderBy($"createDate").limit(n)
topN.show
// +----+----------+
// |name|createDate|
// +----+----------+
// | a|2009-11-23|
// +----+----------+
// Optionally recreate RDD[Foo]
topN.map{case Row(name: String, date: Date) => Foo(name, date)}
嗨zero323你能告訴真快,爲什麼數據幀的表現上限制操作不理想?在執行方面與RDD相比有什麼不同? @ zero333 –
@XinweiLiu我已經提供了你的問題的答案。我希望它能解釋發生了什麼。 – zero323
很好的答案@ zero323。但我還是有同樣的問題,劉新偉有。爲什麼df.limit()很慢? – guilhermecgs