2017-04-02 22 views
3

我想使用Spark來處理來自JDBC源的一些數據。但是,首先,我不想從JDBC讀取原始表,而是想在JDBC端運行一些查詢來過濾列和連接表,並將查詢結果作爲Spark SQL中的表加載。如何在jdbc數據源中使用dbtable選項的子查詢?

以下語法來加載原始JDBC表對我的作品:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306", 
    dbtable="mydb.table1", 
    user="me", 
    password="******", 
    driver="com.mysql.jdbc.Driver" # mysql JDBC driver 5.1.41 
).load() 
df_table1.show() # succeeded 

據星火documentation(我使用PySpark 1.6.3):

DBTABLE:該JDBC表應該閱讀。請注意,可以使用任何在SQL查詢的FROM子句中有效的 。例如,您可以在圓括號中使用子查詢而不是 全表。

所以只是爲了實驗,我試着像這樣簡單的東西:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306", 
    dbtable="(SELECT * FROM mydb.table1) AS table1", 
    user="me", 
    password="******", 
    driver="com.mysql.jdbc.Driver" 
).load() # failed 

它拋出以下異常:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table1 WHERE 1=0' at line 1 

我也嘗試了語法的一些其他變化(增加/刪除括號,刪除'as'子句,切換案例等),沒有任何運氣。那麼正確的語法是什麼?我在哪裏可以找到更詳細的語法文檔?此外,錯誤信息中這個奇怪的「WHERE 1 = 0」來自哪裏?謝謝!

+0

從我的角度來看,你只需要指定你試圖拉入的表格,這樣就省略了選擇的表述。 0 = 1來自您未指定的參數。看看[Dataframe Reader]的源代碼(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader。 scala#L207-L216) –

+0

這個http://stackoverflow.com/q/32628717/1305344看起來很相似,但它與PostgreSQL(不是MySQL)相似。 –

回答

1

對於使用SQL星火SQL查詢從JDBC源讀取數據,你可以嘗試這樣的事:

val df_table1 = sqlContext.read.format("jdbc").options(Map(
    ("url" -> "jdbc:postgresql://localhost:5432/mydb"), 
    ("dbtable" -> "(select * from table1) as table1"), 
    ("user" -> "me"), 
    ("password" -> "******"), 
    ("driver" -> "org.postgresql.Driver")) 
).load() 

我嘗試了使用PostgreSQL。您可以根據MySQL進行修改。

+0

看來我和你的答案一樣,除了我使用python。我的PySpark特有的代碼可能有一些語法錯誤? – Dichen

0

我認爲這可能是Spark SQL中的一個錯誤。

似乎thisthis line給你的錯誤。兩者都使用Scala字符串插值來替換tabledbtable

s"SELECT * FROM $table WHERE 1=0" 

這就是你可以從你所面臨由於上述模式將成爲錯誤發現table1 WHERE 1=0

SELECT * FROM (select * from table1) as table1 WHERE 1=0 

看起來不正確。

有的確是一個MySQL特有的方言 - MySQLDialect - 重寫getTableExistsQueryits own

override def getTableExistsQuery(table: String): String = { 
    s"SELECT 1 FROM $table LIMIT 1" 
} 

,所以我的辦法是,其他方法getSchemaQuery是錯誤的來源。如果您使用Spark 1.6.3,而此方法有@Since("2.1.0")標記,則這種方法非常不可靠。

我強烈建議檢出MySQL數據庫的日誌並查看導致錯誤消息的執行的查詢。

相關問題