2016-07-16 29 views
4

我正在嘗試使用UDF和輸入類型數組結構。 我有下面的數據結構,這是一個更大的結構只有相關部分具有複雜輸入參數的Spark Sql UDF

|--investments: array (nullable = true) 
    | |-- element: struct (containsNull = true) 
    | | |-- funding_round: struct (nullable = true) 
    | | | |-- company: struct (nullable = true) 
    | | | | |-- name: string (nullable = true) 
    | | | | |-- permalink: string (nullable = true) 
    | | | |-- funded_day: long (nullable = true) 
    | | | |-- funded_month: long (nullable = true) 
    | | | |-- funded_year: long (nullable = true) 
    | | | |-- raised_amount: long (nullable = true) 
    | | | |-- raised_currency_code: string (nullable = true) 
    | | | |-- round_code: string (nullable = true) 
    | | | |-- source_description: string (nullable = true) 
    | | | |-- source_url: string (nullable = true) 

我宣佈case類:

case class Company(name: String, permalink: String) 
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String) 
case class Investments(funding_round: FundingRound) 

UDF聲明:

sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => { 
    val totals = investments.map(r => r.funding_round.raised_amount) 
    totals.sum 
}) 

當我執行以下轉換,結果如預期

scala> sqlContext.sql("""select total_funding(investments) from companies""") 
res11: org.apache.spark.sql.DataFrame = [_c0: bigint] 

但是,當一個動作就像收集執行我有一個錯誤:

Executor: Exception in task 0.0 in stage 4.0 (TID 10) 
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments 

感謝您的任何幫助。

回答

10

你看到的錯誤應該是非常明顯的。在催化劑/ SQL類型和Scala類型之間存在嚴格的映射,可以在the Spark SQL, DataFrames and Datasets Guidethe relevant section中找到。

特別是struct類型轉換爲o.a.s.sql.Row(在您的特定情況下數據將顯示爲Seq[Row])。

有不同的方法可用於以暴露數據作爲特定類型:

只有前一種方法可適用於此特定情況。

如果你想使用UDF,你需要這樣的訪問investments.funding_round.raised_amount

val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
    investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount")) 
).toOption) 

,但簡單select應該是更安全和更清潔:

df.select($"investments.funding_round.raised_amount")