2015-02-09 26 views
5

我們有一個配置單元倉庫,並且希望使用火花來完成各種任務(主要是分類)。有時將結果寫回配置單元表。例如,我們編寫了下面的python函數來查找由original_table第一列分組的original_table第二列的總和。該函數可以工作,但我們擔心效率低下,特別是要轉換爲鍵值對的映射和字典版本。函數combiner,mergeValue,mergeCombiner在別處定義,但工作正常。從聚合後的火花表中讀取和寫入

from pyspark import HiveContext 

rdd = HiveContext(sc).sql('from original_table select *') 

#convert to key-value pairs 
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1]))) 

#create rdd where rows are (key, (sum, count) 
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner) 

# creates rdd with dictionary values in order to create schemardd 
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]}) 

# infer the schema 
schema_rdd = HiveContext(sc).inferSchema(dict_rdd) 

# save 
schema_rdd.saveAsTable('new_table_name') 

是否有更有效的方法來做同樣的事情?

+1

不知道爲什麼你必須轉換爲rdd,但是如果你堅持你可以只做'key_value_rdd.reduceByKey(lambda x,y:sum(x,y))'而不是'combineByKey'。 – mtoto 2017-02-28 11:09:38

回答

0

......也許這個問題在編寫時是不可能的,但現在使用createDataFrame()調用沒有意義(後1.3)?

得到你的第一個RDD後,它看起來像你可以進行調用,然後運行一個簡單的SQL語句對結構進行一次完成整個工作。 (Sum和Grouping)另外,如果我正確讀取API文檔,則DataFrame結構可以在創建時直接推斷模式。

http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext

0

這個錯誤可以通過hive.exec.scratchdir設置到用戶有權訪問

+1

這應該是我認爲的評論。 – ketan 2016-03-18 05:31:15

+0

你在說什麼錯誤? – mtoto 2017-02-28 11:05:22

0

您正在使用什麼版本火花的文件夾解決了嗎?

這個答案是根據1.6 &使用數據幀。

val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt") 

    import org.apache.spark.sql.functions._ 
    client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show() 


+-----+---+-----+ 
|Categ|Sum|count| 
+-----+---+-----+ 
| A| 15| 2| 
| B| 56| 1| 
+-----+---+-----+ 

希望這有助於!