2015-10-28 64 views
-2

我正在開發基於Spark的協作過濾算法,但是我陷入了RDD轉換問題。我有我的輸入RDD:在Spark上創建複合鍵

[「John」,「a」,「5」],[「John」,「b」,「3」],[「John」,「c」 ,「2」,[「Mark」,「a」,「3」] [「Mark」,「b」,「4」] [「Lucy」,「b」,「2」] [「Lucy」, 「c」,「5」]

在每個RDD元素中,第一個值是用戶,第二個值是產品名稱(「a」,「b」或「c」),第三個值是它的評價。

我想變換通過按名稱分組RDD輸入,然後通過產品的組合,所以我的最終結果RDD將是

[( 「A」, 「B」),( 「5」 ,「2」)] [(「a」,「b」),(「3」,「4」)] [(「a」,「c」),(「5」,「2」)]

在上面的結果中,因爲約翰和馬克對a和b都有「評級」,所以我有兩個RDD元素,其中(a,b)爲關鍵,他們的評級爲值。只有John對a和c都有評級,因此我只有一個RDD元素,其中(a,c)是關鍵。

+0

我對「如果我輸入'a'」這個短語有點困惑。你能詳細說明一下還是發佈一些代碼? –

+0

對不起,我感到困惑。這一點並不重要,我刪除了這一行, –

+1

這還不清楚... – eliasah

回答

1

你可以做類似如下:

val keyedElems = rdd1.map { case (a, b, c) => (a, (b, c)) } 
val groupedCombinations = keyedElems.groupByKey().flatMapValues(_.toList.combinations(2)) 
val productScoreCombinations = groupedCombinations.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values 

我們這裏做的是密鑰由用戶的輸入數據集,生產的(產品等級)的迭代名單由關鍵組,生產2每個列表的組合,將每個組合放到自己的記錄中,最後對元素進行重新排序,以在他們自己的元組中擁有產品和評級。

當星火本地運行,我看到以下內容:

scala> val rdd1 = sc.parallelize(Array(("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5"))) 
rdd1: org.apache.spark.rdd.RDD[(String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:21 

scala> val rdd2 = rdd1.map { case (a, b, c) => (a, (b, c)) } 
rdd2: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[1] at map at <console>:23 

scala> val rdd3 = rdd2.groupByKey().flatMapValues(_.toList.combinations(2)) 
rdd3: org.apache.spark.rdd.RDD[(String, List[(String, String)])] = MapPartitionsRDD[3] at flatMapValues at <console>:25 

scala> val rdd4 = rdd3.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values 
rdd4: org.apache.spark.rdd.RDD[((String, String), (String, String))] = MapPartitionsRDD[7] at values at <console>:27 

scala> rdd4.foreach(println) 
... 
((a,b),(3,4)) 
((b,c),(2,5)) 
((a,b),(5,3)) 
((a,c),(5,2)) 
((b,c),(3,2)) 

您可以在此運行一個簡單的過濾器來查找與產品「A」的所有行。

(編輯:)

我錯過了你有這個標記作爲pyspark所以我下面一個Python的解決方案(從上面的斯卡拉一個基本映射)更新:

import itertools 

keyedElems = input.map(lambda x: (x[0], (x[1], x[2]))) 
groupedCombinations = keyedElems.groupByKey().flatMapValues(lambda arr: itertools.combinations(arr, 2)) 
productScoreCombinations = groupedCombinations.mapValues(lambda elems: ((elems[0][0], elems[1][0]), (elems[0][1], elems[1][1]))).map(lambda x: x[1]) 

當我運行上面的代碼,我在pyspark看到以下內容:

>>> input = sc.parallelize([("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")]) 
... 
>>> productScoreCombinations.take(6) 
... 
[(('b', 'c'), ('2', '5')), (('a', 'b'), ('5', '3')), (('a', 'c'), ('5', '2')), (('b', 'c'), ('3', '2')), (('a', 'b'), ('3', '4'))] 
+0

謝謝。我正在使用pyspark,所以我想弄清楚你的想法。一個問題,不會rdd.groupByKey()然後flatMapValues()發出相同的rdd? –

+0

根據我的理解,您將用戶組合在一起,然後查找該用戶擁有的所有產品組合。輸出產品組合作爲關鍵,並將它們各自的評級作爲關鍵?那是對的嗎? –

+0

@SYZ對於你的第一個問題,如果身份函數被傳遞給'flatMapValues()',那麼是的。然而,在這裏我們傳遞一個函數來創建分組元組的組合。對於你的第二個問題,如果你的意思是「各自的評級作爲價值」,你基本上是正確的:) :) –