2
在下面的代碼段中,tryParquet
函數嘗試,如果它的存在是爲了從鑲木文件加載一個數據集。如果不是,它計算,仍然存在並返回其提供的數據集的計劃:通用T作爲火花數據集[T]的構造
import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
sealed trait CustomRow
case class MyRow(
id: Int,
name: String
) extends CustomRow
val ds: Dataset[MyRow] =
Seq((1, "foo"),
(2, "bar"),
(3, "baz")).toDF("id", "name").as[MyRow]
def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
Try(session.read.parquet(path)) match {
case Success(df) => df.as[T] // <---- compile error here
case Failure(_) => {
target.write.parquet(path)
target
}
}
val readyDS: Dataset[MyRow] =
tryParquet(spark, "/path/to/file.parq", ds)
然而,這產生於df.as[T]
編譯錯誤:
無法找到存儲在數據集型編碼器。原始類型(int,字符串等)和產品類型(case類)被導入序列化的其他類型將在未來的版本中添加spark.implicits._
支持支持。
情況下成功(DF)=> df.as [T]
人們可以通過使tryParquet
繞過這個問題投df
返回一個無類型DataFrame
和讓呼叫者鑄造爲所需的構造。但是有我們想要的類型由函數內部管理的情況下,任何解決方案?