2015-03-24 44 views
5

沒有火花SQL 1.2.1廢棄警告,下面的代碼在1.3這是Spark 1.3中的一個迴歸錯誤嗎?

1.2.1曾爲停止工作(沒有任何廢棄警告)

val sqlContext = new HiveContext(sc) 
import sqlContext._ 
val jsonRDD = sqlContext.jsonFile(jsonFilePath) 
jsonRDD.registerTempTable("jsonTable") 

val jsonResult = sql(s"select * from jsonTable") 
val foo = jsonResult.zipWithUniqueId().map { 
    case (Row(...), uniqueId) => // do something useful 
    ... 
} 

foo.registerTempTable("...") 

停止1.3.0工作(根本不編譯,我所做的就是改變到1.3)

jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method 

不工作workar ound:

儘管這可能會給我一個RDD [行]:

jsonResult.rdd.zipWithUniqueId() 

現在這不會是RDD[Row]工作,不具有當然的registerTempTable方法

 foo.registerTempTable("...") 

這裏是我的問題

  1. 是否有解決方法? (例如,我只是做錯了嗎?)
  2. 這是一個錯誤? (我認爲任何停止編譯以前版本的工作,沒有@deprecated警告顯然是一個迴歸bug)

回答

5

這不是一個錯誤,但抱歉的混亂!直到Spark 1.3,Spark SQL被標記爲Alpha組件,因爲API仍在不斷變化。在Spark 1.3中,我們畢業並穩定了API。您可以在the documentation中找到移植時需要執行的操作的完整說明。

我也可以回答您的具體問題並給出我們爲什麼做出這些變化

一些理由

停在1.3.0工作(根本不編譯,我所做的就是改變到1.3) jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

DataFrames現在是跨Scala和Java的統一接口。但是,由於我們必須保持與現有RDD API在1.X的其餘部分的兼容性,所以DataFrames不是RDDs。要獲得RDD表示你可以打電話df.rdddf.javaRDD

此外,因爲我們擔心的一些可與隱式轉換髮生的混亂,我們做到了,這樣你必須顯式調用rdd.toDF從RDD造成的轉換髮生。但是,如果您的RDD保存從Product繼承的對象(即元組或案例類),則此轉換僅適用於此轉換。回到最初的問題,如果要對具有任意模式的行進行轉換,您將需要在映射操作之後(因爲編譯器不能)明確告訴Spark SQL有關數據的結構。

import org.apache.spark.sql.types._ 
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil)) 
val newSchema = 
    StructType(
    StructField("uniqueId", IntegerType) +: jsonData.schema.fields) 

val augmentedRows = jsonData.rdd.zipWithUniqueId.map { 
    case (row, id) => 
    Row.fromSeq(id +: row.toSeq) 
} 

val newDF = sqlContext.createDataFrame(augmentedRows, newSchema) 
+0

謝謝!我想我應該先閱讀手冊;)https:// spark。apache.org/docs/1.3.0/sql-programming-guide.html#interoperating-with-rdds – 2015-03-24 22:47:26