2016-05-14 44 views
0

我想將存儲在pyspark.sql.dataframe.DataFrameddf」中的事務按指示事務源(在本例中爲客戶ID)的列「key」進行分組。將Python Spark組事務轉換爲嵌套模式

分組是一個相當昂貴的過程,所以我想寫組磁盤嵌套模式:

(key, [[c1, c2, c3,...], ...]) 

這將讓我迅速地加載上的一個鍵的所有交易,並開發複雜自定義聚合器,而無需重新運行分組。

如何創建嵌套模式並將其寫入磁盤?

回答

0

我花了相當長的一段時間才弄清楚,雖然答案很簡單,所以我想我會在這裏發佈我的解決方案。

首先由key(客戶ID)減少所有的交易:

from operators import add 
# ddf is a dataframe with a transaction in each row. Key is the column 
# we want to group the transactions by. 

txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],)).reduceByKey(add) 

這給出了一個rdd看起來像(key, [list of Rows])。要將其寫回dataframe,您需要構建模式。交易清單可以模擬ArrayType

from pyspark.sql import types as sqxt 
txn_schema = sqxt.StructType([ 
    sqxt.StructField('Key', sqxt.StringType()), 
    sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema)) 
]) 

然後,它的直接的將數據寫入到磁盤中的這種結構:

txnddf = txnrdd.toDF(schema=txn_schema) 
txnddf.write.parquet('customer-transactions.parquet') 

表現似乎確定。如果不通過RDD,找不到方法。