比方說,我有一個數據集採用以下形式減少組合的數量:星火 - 使用GROUPBY
data = sc.parallelize([('customer_1', 'contract_1', 15000, 100),
('customer_1', 'contract_1', 20000, 200),
('customer_2', 'contract_2', 30000, 100),
('customer_1', 'contract_1', 7500, 500)], 2)
其中:
- 第一列代表一個客戶ID。
- 第二欄代表一個合同ID。
- 第三列代表一個時間戳。
- 第四欄是合同價值。
我需要做的是添加一個額外的列,對於每一行,包含具有相同客戶ID,相同合同ID和時間戳等於或大於時間戳的所有行的合同值總和的當前行。
所以,以前的數據集,結果應該是:
customer_1 contract_1 15000 300 # 300 = 100+200
customer_1 contract_1 20000 200 # 200
customer_2 contract_2 30000 100 # 100
customer_1 contract_1 7500 800 # 800 = 100+200+500
如果時間戳檢查不會在那裏,它可以做設置由客戶ID和合同ID組成鍵,通過鍵和減少然後一個連接,但給出時間戳比較存在,我沒有找到一個簡單的方法來做到這一點。
我獲得此做以這種方式使用笛卡爾操作第一種方法:
combinations = data.cartesian(data)
.filter(lambda a: a[0][0] == a[1][0] and
a[0][1] == a[1][1] and
a[1][2] >= a[0][2])
agg = combinations.map(lambda a: (a[0], a[1][3])).reduceByKey(lambda x,y: x+y)
結果是正常,但恐怕將笛卡爾到的數據,我管理(大於1的量百萬行)效率很低。實際上,在這裏應用笛卡爾操作會產生許多根本沒有意義的組合(根據定義,將不同客戶或合同的行組合在一起並不合理),這些組合隨後會被過濾器刪除。
理想的情況下,我會做一個groupBy
使用客戶ID和合同ID作爲關鍵,然後,遍歷產生的groupBy
,並將笛卡爾產品應用到每一行。這將大大減少生成的組合數量。但是,我沒有找到任何方法來做到這一點。更甚者,這可能嗎?如果是這樣,怎麼樣?你有關於如何實現我的要求的任何其他建議/想法?
感謝您的幫助!
非常感謝您zero323。我從Spark開始並且不知道Window函數。感謝您的信息。出於好奇,還有一個問題:如果我有timestamp_start和timestamp_end字段,並且我的總結條件是current_row_timestamp_start> = timestamp_start和current_row_timestamp_end <= timestamp_end?我是否也可以使用Window函數來處理這種情況?謝謝! –
不可以。您可以提供覆蓋行或值的靜態範圍,但不能取決於當前行。從理論上講,你可以用滯後/領先來回顧和前瞻,並嘗試從中建立一些東西,但它不可能是漂亮或高效的。 – zero323