我在PostgreSQL中擁有100萬行和100列以上的數據源,並且我想使用Spark SQL,因此我想將此數據源轉換爲SchemaRDD
。將PostgreSQL數據庫加載到SchemaRDD
兩種方法在Spark SQL Programming Guide引入, 一種是通過反射,這意味着我需要定義:
case class Row(Var1: Int, Var2: String, ...)
這是繁瑣的,因爲我有超過100個列。
另一種辦法是「編程指定模式」,這意味着我需要定義:
val schema =
StructType(
Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))
這對我來說也是乏味。
def extractValues(r: ResultSet) = {
(r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
"SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
0, 1000000, 1, extractValues)
這個API:
事實上,因爲我加載使用JdbcRDD
類我PostgreSQL
數據庫,但我發現我還需要在mapRow
參數JdbcRDD
構造的,它看起來像定義模式仍然有一個問題仍然要求我自己創建架構,更糟糕的是,我需要重做類似的東西來將這個JdbcRDD
轉換爲SchemaRDD
,那將是非常笨拙的代碼。
所以我想知道什麼是這個任務的最佳方法?