2016-10-18 36 views
1

任何人都會遇到此問題並對如何解決該問題有想法?在Spark 2.0.1上執行內部連接時出錯DataFrame

我一直在嘗試更新我的代碼以使用Spark 2.0.1和Scala 2.11。在Spark 1.6.0和Scala 2.10中,一切都開心地工作着。我有一個簡單的數據框到dataframe內部連接,它返回一個錯誤。數據來自AWS RDS Aurora。請注意,下面的foo數據框實際上是92列,而不是我所顯示的兩列。即使只有兩列,問題仍然存在。

相關信息:

數據幀1模式

foo.show() 

+--------------------+------+ 
|  Transaction ID| BIN| 
+--------------------+------+ 
|    bbBW0|134769| 
|    CyX50|173622| 
+--------------------+------+ 

println(foo.printSchema()) 

root 
|-- Transaction ID: string (nullable = true) 
|-- BIN: string (nullable = true) 

數據幀2架構

bar.show() 

+--------------------+-----------------+-------------------+ 
|    TranId|  Amount_USD|  Currency_Alpha| 
+--------------------+-----------------+-------------------+ 
|    bbBW0|   10.99|    USD| 
|    CyX50|   438.53|    USD| 
+--------------------+-----------------+-------------------+ 

println(bar.printSchema()) 

root 
|-- TranId: string (nullable = true) 
|-- Amount_USD: string (nullable = true) 
|-- Currency_Alpha: string (nullable = true) 

加入與dataframes的解釋

val asdf = foo.join(bar, foo("Transaction ID") === bar("TranId")) 
println(foo.join(bar, foo("Transaction ID") === bar("TranId")).explain()) 

== Physical Plan == 
*BroadcastHashJoin [Transaction ID#0], [TranId#202], Inner, BuildRight 
:- *Scan JDBCRelation((SELECT 

     ... 
     I REMOVED A BUNCH OF LINES FROM THIS PRINT OUT 
     ... 

    ) as x) [Transaction ID#0,BIN#8] PushedFilters: [IsNotNull(Transaction ID)], ReadSchema: struct<Transaction ID:string,BIN:string> 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) 
    +- *Filter isnotnull(TranId#202) 
     +- InMemoryTableScan [TranId#202, Amount_USD#203, Currency_Alpha#204], [isnotnull(TranId#202)] 
     : +- InMemoryRelation [TranId#202, Amount_USD#203, Currency_Alpha#204], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
     :  : +- Scan ExistingRDD[TranId#202,Amount_USD#203,Currency_Alpha#204] 

無w ^我得到的錯誤是這樣的:

16/10/18 11:36:50 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6) 
java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'ID IS NOT NULL)' at line 54 

完整的堆棧可以在這裏看到(http://pastebin.com/C9bg2HFt

無處在我的代碼或在我的JDBC查詢,從數據庫中提取數據,我必須ID IS NOT NULL) 。我花了大量的時間谷歌搜索,並發現一個提交Spark的查詢計劃中添加空過濾器的聯接。這裏是提交(https://git1-us-west.apache.org/repos/asf?p=spark.git;a=commit;h=ef770031

回答

0

好奇,如果你已經嘗試了以下;

val dfRenamed = bar.withColumnRenamed("TranId", " Transaction ID") 
val newDF = foo.join(dfRenamed, Seq("Transaction ID"), "inner") 
相關問題