假設我有如下的數據:取N個值從每個分區在火花
val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1))
val DataSortRDD = sc.parallelize(DataSort,2)
現在有兩個分區與:
scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))
假設在每一個分區中的數據已經使用類似sortWithinPartitions(col("src").desc,col("rank").desc)
(這是一個數據幀,但只是爲了說明)排序。
我想從每個分區獲得每個字母的前兩個值(如果有超過2個值)。因此,在這個例子中,結果在每個分區應該是:
scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))
我知道,我必須使用mapPartition
功能,但它在我心中並不清楚知道怎樣才能在每個分區中的值進行迭代,並獲得第一2.任何提示?
編輯:更確切地說,我知道在每個分區中,數據已經先按'字母'排序,然後按'count'排序。所以我的主要想法是mapPartition
中的輸入函數應該遍歷分區,並且yield
是每個字母的前兩個值。這可以通過檢查每個迭代值來完成。這就是我可以在Python寫:
def limit_on_sorted(iterator):
oldKey = None
cnt = 0
while True:
elem = iterator.next()
if not elem:
return
curKey = elem[0]
if curKey == oldKey:
cnt +=1
if cnt >= 2:
yield None
else:
oldKey = curKey
cnt = 0
yield elem
DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)
不要緊,最終的結果是怎麼_partitioned_?換句話說 - 如果你得到了相同的結果,但分區不同,那還是可以的嗎?如預期的那樣,過濾仍將基於原始分區。 –