2016-11-20 27 views
0

我有幾個Spark Dataframes(我們可以稱它們爲Table a,Table b等)。 我想根據對其中一個表的查詢結果向表a添加一列,但此表每次都會根據表a的某個字段的值進行更改。所以這個查詢應該是參數化的。 下面我列出一個例子來解決問題:根據參數化SQL查詢在Spark Dataframe中添加列,這取決於數據幀某些字段的值。

每個表都有OID列和TableName列以及當前表的名稱加上其他列。

This is the fixed query to be performed on Tab A to add new column: 

    Select $ColumnName from $TableName where OID=$oids 

    Tab A 
    | oids|TableName |ColumnName | other fields|New Column: ValueOidDb 
    ================================================================ 
    | 2 | Book  | Title  |  x  |result query:harry potter 
    | 8 | Book  | Isbn  |  y  |result query: 556 
    | 1 | Author | Name  |  z  |result query:Tolkien 
    | 4 | Category |Description|  b  |result query: Commedy 


    Tab Book 
    | OID |TableName |Title  |Isbn |other fields| 
    ================================================================ 
    | 2 | Book  |harry potter| 123 | x   | 
    | 8 | Book  | hobbit  | 556 | y   | 
    | 21 | Book  | etc  | 8942 | z   | 
    | 5 | Book  | etc2  | 984 | b   | 

    Tab Author 
    | OID |TableName  |Name  |nationality |other fields| 
    ================================================================ 
    | 5 | Author  |J.Rowling | eng  | x   | 
    | 2 | Author  |Geor. Martin| us   | y   | 
    | 1 | Author  | Tolkien | eng  | z   | 
    | 13 | Author  | Dan Brown | us   | b   | 


    | OID | TableName |Description | 
    ===================================== 
    | 12 | Category | Fantasy | 
    | 4 | Category | Commedy | 
    | 9 | Category | Thriller | 
    | 7 | Category | Action  | 

我試着用這個UDF

def setValueOid = (oid: Int,TableName: String, TableColumn: String) => { 

    try{ 
     sqlContext.sql(s"Select $currTableColumn from $currTableName where OID = $curroid ").first().toString() 
     } 
    catch{ 
     case x: java.lang.NullPointerException => "error" 
     } 

     } 
    sqlContext.udf.register("setValueOid", setValueOid) 

    val FinalRtxf = sqlContext.sql("SELECT all the column of TAB A ," 
       + " setValueOid(oid, Table,AttributeDatabaseColumn) as  ValueOidDb" 
       + " FROM TAB A") 

我把代碼中的一個嘗試捕捉,否則它給了我一個NullPointerException異常,但它不工作,因爲它總是會返回一個「問題」 。 如果我嘗試這個功能沒有SQL查詢的只是路過一些手動參數它完美的作品:

  val try=setValueOid(8,"BOOK","ISBN") 
      try: String = [0977326403 ]     FINISHED 
      Took 4 sec. Last updated by anonymous at November 20 2016, 3:29:28 AM. 

我讀到這裏,是不是可以做一個查詢UDF內 Trying to execute a spark sql query from a UDF

因此,如何能我解決了我的問題?我不知道如何進行參數化連接。我嘗試這樣做:

 %sql 
     Select all attributes TAB A,  
     FROM TAB A as a 
     join (Select $AttributeDatabaseColumn ,TableName from $Table where OID=$oid) as b 
     on a.Table=b.TableName 

,但它給了我這個例外:

org.apache.spark.sql.AnalysisException: cannot recognize input near '$' 'AttributeDatabaseColumn' ',' in select clause; line 3 pos 1  at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:318) 
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) 
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) 

回答

0

一個選項:

  • 變換每個BookAuthorCategory到窗體:

    root 
    |-- oid: integer (nullable = false) 
    |-- tableName: string (nullable = true) 
    |-- properties: map (nullable = true) 
    | |-- key: string 
    | |-- value: string (valueContainsNull = true) 
    

    例如在Book第一記錄:

    val book = Seq((2L, "Book", 
        Map("title" -> "harry potter", "Isbn" -> "123", "other field" -> "x") 
    )).toDF("oid", "title", "properties") 
    
    +---+---------+---------------------------------------------------------+ 
    |oid|tableName|properties            | 
    +---+---------+---------------------------------------------------------+ 
    |2 |Book  |Map(title -> harry potter, Isbn -> 123, other field -> x)| 
    +---+---------+---------------------------------------------------------+ 
    
  • 工會BookAuthorCategory作爲屬性。

    val properties = book.union(author).union(category) 
    
  • 加入與基表:

    val comb = properties.join(table, Seq($"oid", $"tableName")) 
    
  • 使用case when ...基於tableNameproperties字段添加新列。

+0

我是新的火花。我怎樣才能以這種形式轉換每個數據框(書,作者等)?在這個應用程序中,數據框也是書,作者等,但是我的程序將運行在不同的應用程序中,並且數據框可能會及時更改(表A將始終保留)。我想有一個通用的方法,而不是基於這個具體的例子,因爲我不知道先驗什麼將書,作者等這是可能的嗎?謝謝 – Thanas

+0

也那些表書作者等有千行,我無法手動執行此映射,這將是瘋狂的 – Thanas

相關問題