我花了相當長的一段時間才弄清楚,雖然答案很簡單,所以我想我會在這裏發佈我的解決方案。
首先由key
(客戶ID)減少所有的交易:
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],)).reduceByKey(add)
這給出了一個rdd
看起來像(key, [list of Rows])
。要將其寫回dataframe
,您需要構建模式。交易清單可以模擬ArrayType
。
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
然後,它的直接的將數據寫入到磁盤中的這種結構:
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
表現似乎確定。如果不通過RDD,找不到方法。