2016-11-22 109 views
3

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html很好地解釋了轉軸如何工作的火花。沒有聚合的火花轉軸

在我的Python代碼,我用熊貓沒有聚集,但是重置索引和連接:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B']) 
countryToMerge.index.name = 'ISO' 
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner') 

這是如何工作的火花?

我試着組,並加入手動喜歡:

val grouped = countryKPI.groupBy("A").pivot("B") 
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show 

,但不起作用。 reset_index如何適應spark?它如何以spark原生方式實現?

編輯

的Python代碼的一個小例子:

import pandas as pd 
from datetime import datetime, timedelta 
import numpy as np 
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"]) 
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO']) 
dates['ISO'] = isos.ISO 
dates['ISO'] = dates['ISO'].astype("category") 
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'], 
         'indicator_id':['a','a','b','b'], 
         'value':[7,8,9,7]}) 
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id']) 
countryToMerge.index.name = 'ISO' 
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')) 

    dates ISO a b 
0 2016-01-01 ABC 7 9 
1 2016-01-03 ABC 7 9 
2 2016-01-05 ABC 7 9 
3 2016-01-07 ABC 7 9 
4 2016-01-09 ABC 7 9 
5 2016-01-02 POL 8 7 
6 2016-01-04 POL 8 7 
7 2016-01-06 POL 8 7 
8 2016-01-08 POL 8 7 
9 2016-01-10 POL 8 7 

以沿階遵循/火花

val dates = Seq(("2016-01-01", "ABC"), 
    ("2016-01-02", "ABC"), 
    ("2016-01-03", "POL"), 
    ("2016-01-04", "ABC"), 
    ("2016-01-05", "POL"), 
    ("2016-01-06", "ABC"), 
    ("2016-01-07", "POL"), 
    ("2016-01-08", "ABC"), 
    ("2016-01-09", "POL"), 
    ("2016-01-10", "ABC") 
).toDF("dates", "ISO") 
    .withColumn("dates", 'dates.cast("Date")) 

    dates.show 
    dates.printSchema 

    val countryKPI = Seq(("ABC", "a", 7), 
    ("ABC", "b", 8), 
    ("POL", "a", 9), 
    ("POL", "b", 7) 
).toDF("country_id3", "indicator_id", "value") 

    countryKPI.show 
    countryKPI.printSchema 

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id") 

回答

0

下面的代碼片斷似乎工作 - 但我不知道如果通過avg彙總是正確的 - 即使「擬合數字」是輸出。

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show 

我不知道這是否是「低效」的數據相比,只是重複使用的值更大的量(平均)(因爲我不想要聚合)。

+0

即使我正在尋找沒有聚合的樞軸功能。 @Georg Heiler,你同時找到了什麼? – user3560220

+0

只有在這裏發佈的內容。不幸。 –