2015-10-05 47 views
8

我有Foo類的RDD:class Foo(name : String, createDate : Date)。 我想要一個其他的RDD,舊版本號爲Foo。 我的第一個想法是排序createDate和0.1 *計數限制,但沒有限制功能。如何對Spark進行排序和限制?

你有什麼想法嗎?

回答

14

假設Foo是的情況下,類這樣的:

import java.sql.Date 
case class Foo(name: String, createDate: java.sql.Date) 
  1. 使用普通RDDS:

    import org.apache.spark.rdd.RDD 
    import scala.math.Ordering 
    
    val rdd: RDD[Foo] = sc 
        .parallelize(Seq(
        ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), 
        ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) 
        .toDF("name", "createDate") 
        .withColumn("createDate", $"createDate".cast("date")) 
        .as[Foo].rdd 
    
    rdd.cache() 
    val n = scala.math.ceil(0.1 * rdd.count).toInt 
    
    • 數據裝配到驅動器的存儲器:

        你想
      • 和分數比較小,你想

        rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) 
        // Array[Foo] = Array(Foo(a,2009-11-23)) 
        
      • 比例是比較大的:

        rdd.sortBy(_.createDate.getTime).take(n) 
        
    • 否則

      rdd 
          .sortBy(_.createDate.getTime) 
          .zipWithIndex 
          .filter{case (_, idx) => idx < n} 
          .keys 
      
  2. 使用DataFrame(注意 - 由於極限行爲,這實際上並不是最佳性能)。

    import org.apache.spark.sql.Row 
    
    val topN = rdd.toDF.orderBy($"createDate").limit(n) 
    topN.show 
    
    // +----+----------+ 
    // |name|createDate| 
    // +----+----------+ 
    // | a|2009-11-23| 
    // +----+----------+ 
    
    
    // Optionally recreate RDD[Foo] 
    topN.map{case Row(name: String, date: Date) => Foo(name, date)} 
    
+1

嗨zero323你能告訴真快,爲什麼數據幀的表現上限制操作不理想?在執行方面與RDD相比有什麼不同? @ zero333 –

+0

@XinweiLiu我已經提供了你的問題的答案。我希望它能解釋發生了什麼。 – zero323

+1

很好的答案@ zero323。但我還是有同樣的問題,劉新偉有。爲什麼df.limit()很慢? – guilhermecgs