15

從spark 2.0.1開始我有一些問題。我看了很多文件,但至今未能找到足夠的答案:Spark 2.0數據集vs DataFrame

  • 是什麼
    • df.select("foo")
    • df.select($"foo")
  • 之間的區別做我理解正確的是
    • myDataSet.map(foo.someVal)是typesafe和wi將不會轉換爲RDD,但保留在DataSet表示中/無額外的開銷(2.0.0的性能)
  • 所有其他命令選擇,..只是語法糖。它們不是類型安全的,可以使用地圖代替。沒有map語句,我怎麼能df.select("foo")類型安全?
    • 爲什麼我應該使用UDF/UADF而不是地圖(假設地圖停留在數據集表示中)?
+0

有一個項目,旨在爲星火更多類型的安全而有效的執行路徑上停留:[typelevel /無框](https://github.com/typelevel/frameless) –

回答

11
  1. df.select("foo")df.select($"foo")之間的區別是簽名。前者至少需要一個String,後者一個零或多個Columns。除此之外沒有實際的區別。
  2. myDataSet.map(foo.someVal)類型檢查,但作爲任何Dataset操作使用RDD的對象,並與DataFrame操作相比,有一個很大的開銷。讓我們來看看一個簡單的例子:

    case class FooBar(foo: Int, bar: String) 
    val ds = Seq(FooBar(1, "x")).toDS 
    ds.map(_.foo).explain 
    
    == Physical Plan == 
    *SerializeFromObject [input[0, int, true] AS value#123] 
    +- *MapElements <function1>, obj#122: int 
        +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar 
         +- LocalTableScan [foo#117, bar#118] 
    

    正如你可以看到這個執行計劃需要訪問到所有領域,並有DeserializeToObject

  3. 否。一般而言,其他方法不是句法糖並生成顯着不同的執行計劃。例如:

    ds.select($"foo").explain 
    
    == Physical Plan == 
    LocalTableScan [foo#117] 
    

    相比,纔可以直接訪問列中顯示的計劃。它不是API的限制,而是操作語義差異的結果。

  4. 如何在沒有map語句的情況下df.select(「foo」)類型安全?

    沒有這樣的選擇。而類型化的列允許您以靜態Dataset轉換爲另一種靜態類型Dataset

    ds.select($"bar".as[Int]) 
    

    有沒有類型安全的。還有一些嘗試包括類型安全優化操作,like typed aggregations,但是這個實驗性的API。

  5. 我爲什麼要使用UDF/UADF不是地圖

    這完全取決於你的。 Spark中的每個分佈式數據結構都有自己的優點和缺點(例如參見Spark UDAF with ArrayType as bufferSchema performance issues)。

就個人而言,我覺得靜態類型Dataset是最無用的:

  • 不要提供相同的一系列優化爲Dataset[Row](雖然它們共享存儲格式和一些執行計劃優化其沒有充分受益於代碼生成或堆外存儲),也無法訪問DataFrame的所有分析功能。

  • 類型轉換是黑盒子,有效地爲優化器創建分析障礙。例如選擇(過濾器)不能被推過輸入變換:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
    
    == Physical Plan == 
    *Filter (foo#133 = 1) 
    +- *Filter <function1>.apply 
        +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
         +- Exchange hashpartitioning(foo#133, 200) 
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
          +- LocalTableScan [foo#133, bar#134] 
    

    相比:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
    
    == Physical Plan == 
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) 
    +- Exchange hashpartitioning(foo#133, 200) 
        +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) 
         +- *Filter (foo#133 = 1) 
         +- LocalTableScan [foo#133, bar#134] 
    

    這會影響設有像謂詞下推或投影下推。

  • 不像RDDs那樣靈活,只有一小部分本地支持的類型。

  • 當使用as方法轉換Dataset時,使用Encoders的「類型安全性」存在爭議。由於數據形狀不是使用簽名編碼的,因此編譯器只能驗證是否存在Encoder

相關問題: