2016-05-27 141 views
7

樞軸String列,我有一個簡單的數據幀是這樣的:上Pyspark數據幀

rdd = sc.parallelize(
    [ 
     (0, "A", 223,"201603", "PORT"), 
     (0, "A", 22,"201602", "PORT"), 
     (0, "A", 422,"201601", "DOCK"), 
     (1,"B", 3213,"201602", "DOCK"), 
     (1,"B", 3213,"201601", "PORT"), 
     (2,"C", 2321,"201601", "DOCK") 
    ] 
) 
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"]) 

df_data.show() 
+---+----+----+------+----+ 
| id|type|cost| date|ship| 
+---+----+----+------+----+ 
| 0| A| 223|201603|PORT| 
| 0| A| 22|201602|PORT| 
| 0| A| 422|201601|DOCK| 
| 1| B|3213|201602|DOCK| 
| 1| B|3213|201601|PORT| 
| 2| C|2321|201601|DOCK| 
+---+----+----+------+----+ 

,我需要按日期來透視:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show() 

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|2321.0| null| null| 
| 0| A| 422.0| 22.0| 223.0| 
| 1| B|3213.0|3213.0| null| 
+---+----+------+------+------+ 

一切正常。但現在我需要轉動它,並得到一個非數字列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show() 

,當然我會得到一個異常:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;' 

我想對

線產生的東西
+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|DOCK | null| null| 
| 0| A| DOCK | PORT| DOCK| 
| 1| B|DOCK |PORT | null| 
+---+----+------+------+------+ 

這可能與pivot

回答

10

假設(id |type | date)組合是獨一無二的,你唯一的目標正在樞轉,而不是聚集你可以使用first(或不侷限於數值的任何其它功能):

from pyspark.sql.functions import first 

(df_data 
    .groupby(df_data.id, df_data.type) 
    .pivot("date") 
    .agg(first("ship")) 
    .show()) 

## +---+----+------+------+------+ 
## | id|type|201601|201602|201603| 
## +---+----+------+------+------+ 
## | 2| C| DOCK| null| null| 
## | 0| A| DOCK| PORT| PORT| 
## | 1| B| PORT| DOCK| null| 
## +---+----+------+------+------+ 

如果這些假設是不正確的你」你必須預先彙總你的數據。例如對於最常見的ship值:

from pyspark.sql.functions import max, struct 

(df_data 
    .groupby("id", "type", "date", "ship") 
    .count() 
    .groupby("id", "type") 
    .pivot("date") 
    .agg(max(struct("count", "ship"))) 
    .show()) 

## +---+----+--------+--------+--------+ 
## | id|type| 201601| 201602| 201603| 
## +---+----+--------+--------+--------+ 
## | 2| C|[1,DOCK]| null| null| 
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]| 
## | 1| B|[1,PORT]|[1,DOCK]| null| 
## +---+----+--------+--------+--------+