2014-12-28 91 views
1

我在PostgreSQL中擁有100萬行和100列以上的數據源,並且我想使用Spark SQL,因此我想將此數據源轉換爲SchemaRDD將PostgreSQL數據庫加載到SchemaRDD

兩種方法在Spark SQL Programming Guide引入, 一種是通過反射,這意味着我需要定義:

case class Row(Var1: Int, Var2: String, ...) 

這是繁瑣的,因爲我有超過100個列。

另一種辦法是「編程指定模式」,這意味着我需要定義:

val schema = 
    StructType(
    Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...)) 

這對我來說也是乏味。

def extractValues(r: ResultSet) = { 
    (r.getInt("Var1"), r.getString("Var2"), ...) 
} 
val dbRDD = new JdbcRDD(sc, createConnection, 
    "SELECT * FROM PostgreSQL OFFSET ? LIMIT ?", 
    0, 1000000, 1, extractValues) 

這個API:

事實上,因爲我加載使用JdbcRDD類我PostgreSQL數據庫,但我發現我還需要在mapRow參數JdbcRDD構造的,它看起來像定義模式仍然有一個問題仍然要求我自己創建架構,更糟糕的是,我需要重做類似的東西來將這個JdbcRDD轉換爲SchemaRDD,那將是非常笨拙的代碼。

所以我想知道什麼是這個任務的最佳方法?

回答

2

您需要支持的數據類型數量有限。爲什麼不使用

java.sql.ResultSetMetaData 

例如,

val rs = jdbcStatement.executeQuery("select * from myTable limit 1") 
val rmeta = rs.getMetaData 

要讀取一行,然後爲每個列動態生成所需的StructField。

您將需要一個case語句來處理

val myStructFields = for (cx <- 0 until rmeta.getColumnCount) { 
     val jdbcType = rmeta.getColumnType(cx) 
     } yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType)) 

val mySchema = StructType(myStructFields.toSeq) 

哪裏jdbcToSparkType是大致如下:

def jdbcToSparkType(jdbcType: Int) = { 
    jdbcType match { 
     case 4 => InteegerType 
     case 6 => FloatType 
     .. 
    } 

UPDATE要生成RDD [行]:你會遵循類似的模式。在這種情況下,你會

val rows = for (rs.next) { 
    row = jdbcToSpark(rs) 
    } yield row 

val rowRDD = sc.parallelize(rows) 

其中

def jdbcToSpark(rs: ResultSet) = { 
    var rowSeq = Seq[Any]() 
    for (cx <- 0 to rs.getMetaData.getColumnCount) { 
    rs.getColumnType(cx) match { 
     case 4 => rowSeq :+ rs.getInt(cx) 
      .. 
    } 
    } 
    Row.fromSeq(rowSeq) 
} 

然後 VAL行

相關問題