2016-07-27 49 views
0

假設我有如下的數據:取N個值從每個分區在火花

val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1)) 
val DataSortRDD = sc.parallelize(DataSort,2) 

現在有兩個分區與:

scala>DataSortRDD.glom().take(2).head 
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4)) 
scala>DataSortRDD.glom().take(2).tail 
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1))) 

假設在每一個分區中的數據已經使用類似sortWithinPartitions(col("src").desc,col("rank").desc)(這是一個數據幀,但只是爲了說明)排序。

我想從每個分區獲得每個字母的前兩個值(如果有超過2個值)。因此,在這個例子中,結果在每個分區應該是:

scala>HypotheticalRDD.glom().take(2).head 
Array(("a",5),("b",13),("b",2),("c",4)) 
scala>HypotheticalRDD.glom().take(2).tail 
Array(Array(("a",1),("b",15),("c",3),("c",2))) 

我知道,我必須使用mapPartition功能,但它在我心中並不清楚知道怎樣才能在每個分區中的值進行迭代,並獲得第一2.任何提示?

編輯:更確切地說,我知道在每個分區中,數據已經先按'字母'排序,然後按'count'排序。所以我的主要想法是mapPartition中的輸入函數應該遍歷分區,並且yield是每個字母的前兩個值。這可以通過檢查每個迭代值來完成。這就是我可以在Python寫:

def limit_on_sorted(iterator): 
    oldKey = None 
    cnt = 0 
    while True: 
     elem = iterator.next() 
     if not elem: 
      return 
     curKey = elem[0] 
     if curKey == oldKey: 
      cnt +=1 
      if cnt >= 2: 
       yield None 
     else: 
      oldKey = curKey 
      cnt = 0 
     yield elem 

DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None) 
+0

不要緊,最終的結果是怎麼_partitioned_?換句話說 - 如果你得到了相同的結果,但分區不同,那還是可以的嗎?如預期的那樣,過濾仍將基於原始分區。 –

回答

1

假設你真的不關心結果的分區,你可以使用mapPartitionsWithIndex納入分區ID成關鍵您groupBy,那麼你可以很容易地把前兩項爲每個這樣的關鍵:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitionsWithIndex { 
    // add the partition ID into the "key" of every record: 
    case (partitionId, itr) => itr.map { case (k, v) => ((k, partitionId), v) } 
    } 
    .groupByKey() // groups by letter and partition id 
    // take only first two records, and drop partition id 
    .flatMap { case ((k, _), itr) => itr.take(2).toArray.map((k, _)) } 

println(result.collect().toList) 
// prints: 
// List((a,5), (b,15), (b,13), (b,2), (a,1), (c,4), (c,3)) 

請注意,最終的結果(groupByKey改變了分區),我是,假設這對你想要做的事情(坦率地說,逃脫了我)並不關鍵。

編輯:如果你想避免洗牌和每個分區中的所有操作:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitions(_.toList.groupBy(_._1).mapValues(_.take(2)).values.flatten.iterator, true) 
+0

感謝您的回答。也許我應該在問題中提及它。我想使用'mapPartition'的原因是因爲我想避免出於效率原因在分區之間進行混洗。在你使用'groupByKey'的解決方案中,有洗牌。 –

+0

我明白了。編輯我的答案,包括一個沒有洗牌的解決方案(保留分區) –

+0

您的回答是正確的。我關心的是'groupBy(_._ 1)'。爲什麼當我知道這些值已經按字母和按數字排序後需要分組?我已經更新了我的問題以更清晰地表明我的想法。 –