4
我正在嘗試使用UDF和輸入類型數組結構。 我有下面的數據結構,這是一個更大的結構只有相關部分具有複雜輸入參數的Spark Sql UDF
|--investments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- funding_round: struct (nullable = true)
| | | |-- company: struct (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- permalink: string (nullable = true)
| | | |-- funded_day: long (nullable = true)
| | | |-- funded_month: long (nullable = true)
| | | |-- funded_year: long (nullable = true)
| | | |-- raised_amount: long (nullable = true)
| | | |-- raised_currency_code: string (nullable = true)
| | | |-- round_code: string (nullable = true)
| | | |-- source_description: string (nullable = true)
| | | |-- source_url: string (nullable = true)
我宣佈case類:
case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)
UDF聲明:
sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => {
val totals = investments.map(r => r.funding_round.raised_amount)
totals.sum
})
當我執行以下轉換,結果如預期
scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]
但是,當一個動作就像收集執行我有一個錯誤:
Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments
感謝您的任何幫助。