2017-01-12 47 views
0

這讓我感到困惑。我正在使用「spark-testing-base_2.11」%「2.0.0_0.5.0」進行測試。任何人都可以解釋爲什麼如果使用數據集,地圖功能會改變模式,但如果我使用RDD則可以工作嗎?任何見解非常感謝。火星地圖操作更改模式

import com.holdenkarau.spark.testing.SharedSparkContext 
import org.apache.spark.sql.{ Encoders, SparkSession } 
import org.scalatest.{ FunSpec, Matchers } 

class TransformSpec extends FunSpec with Matchers with SharedSparkContext { 
    describe("data transformation") { 
    it("the rdd maintains the schema") { 
     val spark = SparkSession.builder.getOrCreate() 
     import spark.implicits._ 

     val personEncoder = Encoders.product[TestPerson] 
     val personDS = Seq(TestPerson("JoeBob", 29)).toDS 
     personDS.schema shouldEqual personEncoder.schema 

     val mappedSet = personDS.rdd.map { p: TestPerson => p.copy(age = p.age + 1) }.toDS 
     personEncoder.schema shouldEqual mappedSet.schema 
    } 

    it("datasets choke on explicit schema") { 
     val spark = SparkSession.builder.getOrCreate() 
     import spark.implicits._ 

     val personEncoder = Encoders.product[TestPerson] 
     val personDS = Seq(TestPerson("JoeBob", 29)).toDS 

     personDS.schema shouldEqual personEncoder.schema 

     val mappedSet = personDS.map[TestPerson] { p: TestPerson => p.copy(age = p.age + 1) } 
     personEncoder.schema shouldEqual mappedSet.schema 
    } 
    } 
} 

case class TestPerson(name: String, age: Int) 

回答

0

地圖是對數據的轉換操作。它接受輸入和函數,並將該函數應用於輸入數據的所有元素。輸出是該函數的返回值集合。所以輸出數據的schmea取決於函數的返回類型。 在函數式編程中,映射操作是一個相當標準和大量使用的操作。如果你想了解更多,請看https://en.m.wikipedia.org/wiki/Map_(higher-order_function)

+0

嗨Tapan。我熟悉地圖的功能。我想要修改一組記錄(查看示例,它需要一個TestPerson,並在其年齡上添加一年)。但是,如果查看返回的TestPerson的模式,它與系統所說的不同(personEncoder)。我不明白的是爲什麼它改變了模式。感謝名單。 – Richard

1

有幾件事情在這裏陰謀反對你。 Spark似乎對它認爲可以爲空的類型有特殊的外殼。

case class TestTypes(
     scalaString: String, 
     javaString: java.lang.String, 
     myString: MyString, 
     scalaInt: Int, 
     javaInt: java.lang.Integer, 
     myInt: MyInt 
    ) 

    Encoders.product[TestTypes].schema.printTreeString results in: 
    root 
    |-- scalaString: string (nullable = true) 
    |-- javaString: string (nullable = true) 
    |-- myString: struct (nullable = true) 
    | |-- value: string (nullable = true) 
    |-- scalaInt: integer (nullable = false) 
    |-- javaInt: integer (nullable = true) 
    |-- myInt: struct (nullable = true) 
    | |-- value: integer (nullable = false) 

,但如果你映射的類型,您將結束一切可空

val testTypes: Seq[TestTypes] = Nil 
val testDS = testTypes.toDS 
testDS.map(foo => foo).mapped.schema.printTreeString results in everything being nullable: 
root 
|-- scalaString: string (nullable = true) 
|-- javaString: string (nullable = true) 
|-- myString: struct (nullable = true) 
| |-- value: string (nullable = true) 
|-- scalaInt: integer (nullable = true) 
|-- javaInt: integer (nullable = true) 
|-- myInt: struct (nullable = true) 
| |-- value: integer (nullable = true) 
  • 即使你強迫的模式是正確的,星火明確地忽略非空在應用模式時進行比較,這就是爲什麼當您轉換回類型化表示時,您將失去少數可以保證的可空性。
  • 你可以豐富你的類型,以便能夠強制非空的模式:

    implicit class StructImprovements(s: StructType) { 
        def nonNull: StructType = StructType(s.map(_.copy(nullable = false))) 
        } 
    
    implicit class DsImprovements[T: Encoder](ds: Dataset[T]) { 
        def nonNull: Dataset[T] = { 
         val nnSchema = ds.schema.nonNull 
         applySchema(ds.toDF, nnSchema).as[T] 
        } 
        } 
    
    val mappedSet = personDS.map { p => 
        p.copy(age = p.age + 1) 
        }.nonNull 
    

    但你會發現應用任何有趣的操作時,然後再比較模式時,如果形狀是除同它蒸發Spark的可空性將以相同的方式傳遞。

    這似乎是由設計https://github.com/apache/spark/pull/11785