2015-12-06 44 views
1

嘗試查詢sql server表時出錯請幫忙。在查詢sql server表時出現sql錯誤

臨時表不允許指定數據庫名稱或其他限定符。如果表名中有點(。),請用反引號(`)引用表名。

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
 
     val querytest=sqlContext.sql(query) 
 
     val prop=new Properties() 
 
     val url2="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014" 
 
     prop.setProperty("user","admin") 
 
     prop.setProperty("password","oracle") 
 
     val test=sqlContext.read.jdbc(url2,"Customer",prop)

到工作的代碼所做的更改: -

package com.kali.db 
 

 
/** 
 
* Created by kalit_000 on 06/12/2015. 
 
*/ 
 

 
import java.util.Properties 
 
import org.apache.spark.SparkConf 
 
import org.apache.log4j.Logger 
 
import org.apache.log4j.Level 
 
import org.apache.spark._ 
 
import org.apache.spark.rdd.{JdbcRDD, RDD} 
 
import org.apache.spark.sql.DataFrame 
 
import org.springframework.context.support.ClassPathXmlApplicationContext 
 

 
case class SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String) 
 

 
object SparkDBExtractorMPP { 
 

 
    def main (args: Array[String]) { 
 

 
    Logger.getLogger("org").setLevel(Level.WARN) 
 
    Logger.getLogger("akka").setLevel(Level.WARN) 
 

 
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs", "false") 
 
    val sc = new SparkContext(conf) 
 

 
    def opfile(value:DataFrame,delimeter:String):RDD[String]= 
 
    { 
 
     value.map(x => x.toString.replace("[","").replace("]","").replace(",",delimeter)) 
 
    } 
 

 
    //read the application context file 
 
    val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml") 
 
    val DBinfo = ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP] 
 

 
    val driver = DBinfo.driver 
 
    val url = DBinfo.url 
 
    val username = DBinfo.username 
 
    val password = DBinfo.password 
 
    val table = DBinfo.table 
 
    val opdelimeter=DBinfo.opdelimeter 
 
    val lowerbound=DBinfo.lowerbound.toInt 
 
    val upperbound=DBinfo.upperbound.toInt 
 
    val numberofpartitions=DBinfo.numberofparitions.toInt 
 
    val parallelizecolumn=DBinfo.parallelizecolumn 
 

 

 
    println("DB Driver:-%s".format(driver)) 
 
    println("DB Url:-%s".format(url)) 
 
    println("Username:-%s".format(username)) 
 
    println("Password:-%s".format(password)) 
 
    println("Table:-%s".format(table)) 
 
    println("Opdelimeter:-%s".format(opdelimeter)) 
 
    println("Lowerbound:-%s".format(lowerbound)) 
 
    println("Upperbound:-%s".format(upperbound)) 
 
    println("Numberofpartitions:-%s".format(numberofpartitions)) 
 
    println("Parallelizecolumn:-%s".format(parallelizecolumn)) 
 

 
    try { 
 
    val props=new Properties() 
 
    props.put("user",username) 
 
    props.put("password",password) 
 
    props.put("driver",driver) 
 

 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
 
    val df = sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props) 
 

 
    df.show(10) 
 

 
    opfile(df,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt") 
 

 
    } catch { 
 
     case e: Exception => e.printStackTrace 
 
    } 
 
    sc.stop() 
 
    } 
 
}

我使用的Spring bean使火花代碼配置

<?xml version="1.0" encoding="UTF-8"?> 
 
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" 
 
     "http://www.springframework.org/dtd/spring-beans.dtd"> 
 
<beans> 
 
    <bean id="queryProps" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> 
 
    </bean> 
 

 
    <bean id="SparkSQLDBExtractorMPP" class="com.kali.db.SparkSqlValueClassMPP"> 
 
     <constructor-arg value="com.microsoft.sqlserver.jdbc.SQLServerDriver" /> 
 
     <constructor-arg value="jdbc:sqlserver://localhost;user=admin;password=oracle;database=AdventureWorks2014" /> 
 
     <constructor-arg value="admin" /> 
 
     <constructor-arg value="oracle" /> 
 
     <constructor-arg value="(select top 100 CustomerID,StoreID,TerritoryID,AccountNumber,ModifiedDate from customer) as customer" /> 
 
     <constructor-arg value="~" /> 
 
     <constructor-arg value="1" /> 
 
     <constructor-arg value="100" /> 
 
     <constructor-arg value="8" /> 
 
     <constructor-arg value="CustomerID" /> 
 
    </bean> 
 
</beans>

回答

0

如果使用火花1.4功能負載貶值。

load public DataFrame load(String path, String source)
已過時。自1.4.0起,換成read().format(source).load(path)
使用給定的數據源將存儲在路徑中的數據集作爲DataFrame返回。
參數:
路 - (無證)
源 - (無證)
返回:
(無證)

做到這一點的新方法是在這裏http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/

+1

我不想列名是硬編碼,而從結果集中獲取數據還有另外一種方法,我想通過傳遞值(db,url,用戶名,密碼,查詢)到Spring框架來重用我的代碼,我的代碼應該連接到任何數據庫並將數據轉儲到文件我試過Jd bcRDD唯一的問題,我們需要在r.getString(1)處提及我不想要的列名,val myRDD = new JdbcRDD(sc,()=> DriverManager.getConnection(url,username,password),query + 「哪裏? =? 「,1,1,1, r => r.getString(1)+」,「+ r.getString(2)+」,「+ r.getString(3)) –

+0

我不知道如果我得到但是我認爲你可以通過變量來傳遞列名,或者你可以通過select *將所有表加載到數據框,然後使用數據框選擇你的查詢更加靈活和更快 – zt1983811

+1

我可以在df上編寫普通的sql嗎? val sqlContext = new org.apache.spark.sql.SQLContext(sc)val df = sqlContext.read.format(「jdbc」)。options(Map(「url」 - > url,「dbtable」 - >「customer」, 「查詢」 - >查詢,「驅動程序」 - >驅動程序))。load() –