我有幾個Spark Dataframes(我們可以稱它們爲Table a,Table b等)。 我想根據對其中一個表的查詢結果向表a添加一列,但此表每次都會根據表a的某個字段的值進行更改。所以這個查詢應該是參數化的。 下面我列出一個例子來解決問題:根據參數化SQL查詢在Spark Dataframe中添加列,這取決於數據幀某些字段的值。
每個表都有OID列和TableName列以及當前表的名稱加上其他列。
This is the fixed query to be performed on Tab A to add new column:
Select $ColumnName from $TableName where OID=$oids
Tab A
| oids|TableName |ColumnName | other fields|New Column: ValueOidDb
================================================================
| 2 | Book | Title | x |result query:harry potter
| 8 | Book | Isbn | y |result query: 556
| 1 | Author | Name | z |result query:Tolkien
| 4 | Category |Description| b |result query: Commedy
Tab Book
| OID |TableName |Title |Isbn |other fields|
================================================================
| 2 | Book |harry potter| 123 | x |
| 8 | Book | hobbit | 556 | y |
| 21 | Book | etc | 8942 | z |
| 5 | Book | etc2 | 984 | b |
Tab Author
| OID |TableName |Name |nationality |other fields|
================================================================
| 5 | Author |J.Rowling | eng | x |
| 2 | Author |Geor. Martin| us | y |
| 1 | Author | Tolkien | eng | z |
| 13 | Author | Dan Brown | us | b |
| OID | TableName |Description |
=====================================
| 12 | Category | Fantasy |
| 4 | Category | Commedy |
| 9 | Category | Thriller |
| 7 | Category | Action |
我試着用這個UDF
def setValueOid = (oid: Int,TableName: String, TableColumn: String) => {
try{
sqlContext.sql(s"Select $currTableColumn from $currTableName where OID = $curroid ").first().toString()
}
catch{
case x: java.lang.NullPointerException => "error"
}
}
sqlContext.udf.register("setValueOid", setValueOid)
val FinalRtxf = sqlContext.sql("SELECT all the column of TAB A ,"
+ " setValueOid(oid, Table,AttributeDatabaseColumn) as ValueOidDb"
+ " FROM TAB A")
我把代碼中的一個嘗試捕捉,否則它給了我一個NullPointerException異常,但它不工作,因爲它總是會返回一個「問題」 。 如果我嘗試這個功能沒有SQL查詢的只是路過一些手動參數它完美的作品:
val try=setValueOid(8,"BOOK","ISBN")
try: String = [0977326403 ] FINISHED
Took 4 sec. Last updated by anonymous at November 20 2016, 3:29:28 AM.
我讀到這裏,是不是可以做一個查詢UDF內 Trying to execute a spark sql query from a UDF
因此,如何能我解決了我的問題?我不知道如何進行參數化連接。我嘗試這樣做:
%sql
Select all attributes TAB A,
FROM TAB A as a
join (Select $AttributeDatabaseColumn ,TableName from $Table where OID=$oid) as b
on a.Table=b.TableName
,但它給了我這個例外:
org.apache.spark.sql.AnalysisException: cannot recognize input near '$' 'AttributeDatabaseColumn' ',' in select clause; line 3 pos 1 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:318)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
我是新的火花。我怎樣才能以這種形式轉換每個數據框(書,作者等)?在這個應用程序中,數據框也是書,作者等,但是我的程序將運行在不同的應用程序中,並且數據框可能會及時更改(表A將始終保留)。我想有一個通用的方法,而不是基於這個具體的例子,因爲我不知道先驗什麼將書,作者等這是可能的嗎?謝謝 – Thanas
也那些表書作者等有千行,我無法手動執行此映射,這將是瘋狂的 – Thanas