2016-07-16 29 views
1

執行平均操作我有這樣如何在Apache中弗林克使用Scala的

15,Rom,36,49 
16,Weyoun,22,323 
17,Odo,35,13 
18,Jean-Luc,45,455 

的數據集,我想選擇第3列和第4列作爲我的鍵和值,我怎麼能執行Apache的平均操作弗林克。我能夠實現「按鍵分組」。但我無法對每個鍵的值執行平均操作。

val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv") 

val jn = lines.map(line => line.split(",")).map(word => (word(2).toString,word(3).toInt)).groupBy("0") 
+0

Flink中沒有平均運營商。您需要使用「減少」或「聚集」並編寫自定義UDF代碼。 –

+0

我知道。你能告訴我如何使用reduce或aggregate函數來執行它。 –

回答

0

這應該工作

val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv") 

val jn = lines.map(line => line.split(",")).map(word => (word(2).toString, 1,word(3).toDouble)).groupBy(0).reduce { 
    (left, right) => 
     val (key, left1, left2) = left 
     val (_, right1, right2) = right 
     (key, left1 + left2, right1 + right2) 
}.map(tuple => (tuple._1, tuple._3/tuple._2)) 
+0

它拋出一個錯誤**類型不匹配;發現:任何要求:String **靠近left2和right2。你可以請檢查一次。在加法操作 –

3

注意的是,我沒有改變map了。它現在發出一個三元組:

val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv") 

val jn = lines 
    .map(line => line.split(",")) 
    .map(word => (word(2).toString,word(3).toInt,1)) 
    .groupBy("0") 
    .reduce { (left, right) => (left._1, left._2 + right._2, left._3 + right._3) } 
    .map(tuple => (tuple._1, tuple._2/tuple._3)) 
+1

有一個小的更正,最後它應該是tuple._2/tuple._3 –

+0

謝謝!修復! –

+0

謝謝!有效 :) –