2017-08-04 58 views
0

我正嘗試使用JDBC預處理。它導致ResultSet對象。我想將其轉換爲火花數據框。如何將java結果集轉換爲Spark數據框

object JDBCRead { 

val tableName:String = "TABLENAME" 
val url :String = "jdbc:teradata://TERADATA_URL/user=USERNAME,password=PWD,charset=UTF8,TYPE=FASTEXPORT,SESSIONS=10" 
val selectTable:String = "SELECT * FROM " + tableName +" sample 10"; 

val con : Connection = DriverManager.getConnection(url); 


val pstmt2: PreparedStatement = con.prepareStatement(selectTable) 

import java.sql.ResultSet 

val rs: ResultSet = pstmt2.executeQuery 



val rsmd: ResultSetMetaData = rs.getMetaData 
while(rs.next()!=null) 
{ 
    val k: Boolean = rs.next() 
    for(i<-1 to rsmd.getColumnCount) { 
    print(" " + rs.getObject(i)) 
    } 
    println() 
} 

} 

我想打電話從以上數據幀星火代碼,這樣我可以將數據加載到數據幀,並獲得滿意的結果更快分佈式。我需要使用PreparedStatement。我無法使用spark.jdbc.load,因爲Teradata的FASTEXPORT不支持jdbc加載。它必須與PreparedStatement

如何實現這一目標?我如何使用預處理和SELECT語句加載到Spark Dataframe中。

回答

1

-

AFAIK有2對這類要求 1. DataFrame 2. JdbcRDD

我提供JdbcRDD(因爲你是那麼具體到PreparedStatement的可用選項)

其中使用了prepareStatement內部採用compute方法。因此,您不需要創建連接並明確維護它(容易出錯)。

之後,您可以在到數據幀

結果轉換爲速度,你可以配置其他參數。

實施例代碼JdbcRDD的使用率低於..

import org.apache.log4j.{Level, Logger} 
    import org.apache.spark.SparkContext 
    import org.apache.spark.SparkContext.__ 
    import org.apache.spark.SparkConf 
    import org.apache.spark.rdd.JdbcRDD 
    import java.sql.{connection, DriverManager,ResultSet} 


    object jdbcRddExample { 
    def main(args: Array[String]) { 

     // Connection String  
     VAL URL = "jdbc:teradata://SERVER/demo" 
     val username = "demo" 
     val password = "Spark" 
     Class.forName("com.teradata.jdbc.Driver").newInstance 
     // Creating & Configuring Spark Context 
     val conf = new SparkConf().setAppName("App1").setMaster("local[2]").set("spark.executor.memory",1) 
     val sc = new SparkContext(conf) 
     println("Start...") 
     // Fetching data from Database 
     val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password), 
     "select first_name, last_name, gender from person limit ?,?", 
     3,5,1,r => r.getString("last_name") + "," +r.getString("first_name")) 
     // Displaying the content 
     myRDD.foreach(println) 
     // Saving the content inside Text File 
     myRDD.saveAsTextFile("c://jdbcrdd") 

     println("End...") 
    } 
    } 
相關問題