2017-09-21 66 views
2

在下面的代碼段中,tryParquet函數嘗試,如果它的存在是爲了從鑲木文件加載一個數據集。如果不是,它計算,仍然存在並返回其提供的數據集的計劃:通用T作爲火花數據集[T]的構造

import scala.util.{Try, Success, Failure} 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.Dataset 

sealed trait CustomRow 

case class MyRow(
    id: Int, 
    name: String 
) extends CustomRow 

val ds: Dataset[MyRow] = 
    Seq((1, "foo"), 
     (2, "bar"), 
     (3, "baz")).toDF("id", "name").as[MyRow] 

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] = 
    Try(session.read.parquet(path)) match { 
     case Success(df) => df.as[T] // <---- compile error here 
     case Failure(_) => { 
     target.write.parquet(path) 
     target 
     } 
    } 

val readyDS: Dataset[MyRow] = 
    tryParquet(spark, "/path/to/file.parq", ds) 

然而,這產生於df.as[T]編譯錯誤:

無法找到存儲在數據集型編碼器。原始類型(int,字符串等)和產品類型(case類)被導入序列化的其他類型將在未來的版本中添加spark.implicits._

支持支持。

情況下成功(DF)=> df.as [T]

人們可以通過使tryParquet繞過這個問題投df返回一個無類型DataFrame和讓呼叫者鑄造爲所需的構造。但是有我們想要的類型由函數內部管理的情況下,任何解決方案?

回答

5

看起來像它通過在類型參數使用Encoder可能:

import org.apache.spark.sql.Encoder 

def tryParquet[T <: CustomRow: Encoder](...) 

這種方式,編譯器可以證明構建對象時df.as[T]正在提供的編碼器。

相關問題