2015-09-02 51 views
0

所以這個標題應該讓人困惑,所以我會盡我所能解釋。我試圖將此功能分解爲定義的函數,以便更好地瞭解aggregateByKey如何爲將寫入我的代碼的其他團隊工作。我有以下合計:Spark aggregateByKey使用Map併爲函數定義數據類型

val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
     (accumCount, value) => accumCount.get(value.head) match { 
     case None => accumCount + (value.head -> 1) 
     case Some(count) => accumCount + (value.head -> (count + 1)) 
     }, 
     (accum1, accum2) => accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))} 
    ).collect() 

我一直想打破這件事,如下所示:

val firstLet = Map[Char, Int] 
    def fSeq(accumCount:?, value:?) = { 
    accumCount.get(value.head) match { 
     case None => accumCount + (value.head -> 1) 
     case Some(count) => accumCount + (value.head -> (count + 1)) 
    } 
    } 
    def fComb(accum1:?, accum2:?) = { 
    accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0)) 
    } 

由於初始值是一個地圖[字符,INT]我不知道是什麼使accumCount,Value數據類型定義。我嘗試了不同的東西,但沒有看到任何工作。有人可以幫我定義數據類型並解釋你如何確定它?

+0

而這裏的輸入是什麼? 'RDD [(T,String)]'? – zero323

回答

1
  • seqOp採用相同類型作爲初始值作爲第一個參數,以及相同類型的值作爲值的累加器在RDD
  • combOp取兩個相同類型的累加器的初始值。

假設你想要聚合RDD[(T,U)]

def fSeq(accumCount: Map[Char, Int], value: U): Map[Char, Int] = ??? 
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = ??? 

我猜你的情況U簡直是爲String,所以你應該調整fSeq簽名。

BTW,您可以使用提供默認映射和簡化功能:

val firstLet = Map[Char, Int]().withDefault(x => 0) 

def fSeq(accumCount: Map[Char, Int], value: String): Map[Char, Int] = { 
    accumCount + (value.head -> (accumCount(value.head) + 1)) 
} 

def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = { 
    val accum = (accum1.keys ++ accum2.keys).map(k => (k, accum1(k) + accum2(k))) 
    accum.toMap.withDefault(x => 0) 
} 

最後,它可以更有效地使用scala.collection.mutable.Map

import scala.collection.mutable.{Map => MMap} 

def firstLetM = MMap[Char, Int]().withDefault(x => 0) 

def fSeqM(accumCount: MMap[Char, Int], value: String): MMap[Char, Int] = { 
    accumCount += (value.head -> (accumCount(value.head) + 1)) 
} 

def fCombM(accum1: MMap[Char, Int], accum2: MMap[Char, Int]): MMap[Char, Int] = { 
    accum2.foreach{case (k, v) => accum1 += (k -> (accum1(k) + v))} 
    accum1 
} 

測試:

def randomChar() = (scala.util.Random.nextInt.abs % 58 + 65).toChar 
def randomString() = { 
    (Seq(randomChar) ++ Iterator.iterate(randomChar)(_ => randomChar) 
     .takeWhile(_ => scala.util.Random.nextFloat > 0.1)).mkString 
} 

val stringRdd = sc.parallelize(
    (1 to 500000).map(_ => (scala.util.Random.nextInt.abs % 60, randomString))) 


val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
    (accumCount, value) => accumCount.get(value.head) match { 
    case None => accumCount + (value.head -> 1) 
    case Some(count) => accumCount + (value.head -> (count + 1)) 
    }, 
    (accum1, accum2) => accum1 ++ accum2.map{ 
    case(k,v) => k -> (v + accum1.getOrElse(k, 0))} 
).collectAsMap() 

val firstLetter2 = stringRDD 
    .aggregateByKey(firstLet)(fSeq, fComb) 
    .collectAsMap 

val firstLetter3 = stringRDD 
    .aggregateByKey(firstLetM)(fSeqM, fCombM) 
    .mapValues(_.toMap) 
    .collectAsMap 


firstLetter == val firstLetter2 
firstLetter == val firstLetter3 
+0

感謝您的補充更新,使代碼更易於閱讀和使用。 – theMadKing

+0

順便說一句,這段代碼無法正常工作,舊的代碼計算了所有的鍵和coutns,無論出於何種原因,這段代碼都停在50次計數。我的舊代碼不這樣做! – theMadKing

+0

你能提供示例輸入嗎? – zero323

相關問題