你可以做類似如下:
val keyedElems = rdd1.map { case (a, b, c) => (a, (b, c)) }
val groupedCombinations = keyedElems.groupByKey().flatMapValues(_.toList.combinations(2))
val productScoreCombinations = groupedCombinations.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values
我們這裏做的是密鑰由用戶的輸入數據集,生產的(產品等級)的迭代名單由關鍵組,生產2每個列表的組合,將每個組合放到自己的記錄中,最後對元素進行重新排序,以在他們自己的元組中擁有產品和評級。
當星火本地運行,我看到以下內容:
scala> val rdd1 = sc.parallelize(Array(("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")))
rdd1: org.apache.spark.rdd.RDD[(String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2 = rdd1.map { case (a, b, c) => (a, (b, c)) }
rdd2: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[1] at map at <console>:23
scala> val rdd3 = rdd2.groupByKey().flatMapValues(_.toList.combinations(2))
rdd3: org.apache.spark.rdd.RDD[(String, List[(String, String)])] = MapPartitionsRDD[3] at flatMapValues at <console>:25
scala> val rdd4 = rdd3.mapValues { case (elems: List[(String, String)]) => ((elems(0)._1, elems(1)._1), (elems(0)._2, elems(1)._2)) }.values
rdd4: org.apache.spark.rdd.RDD[((String, String), (String, String))] = MapPartitionsRDD[7] at values at <console>:27
scala> rdd4.foreach(println)
...
((a,b),(3,4))
((b,c),(2,5))
((a,b),(5,3))
((a,c),(5,2))
((b,c),(3,2))
您可以在此運行一個簡單的過濾器來查找與產品「A」的所有行。
(編輯:)
我錯過了你有這個標記作爲pyspark所以我下面一個Python的解決方案(從上面的斯卡拉一個基本映射)更新:
import itertools
keyedElems = input.map(lambda x: (x[0], (x[1], x[2])))
groupedCombinations = keyedElems.groupByKey().flatMapValues(lambda arr: itertools.combinations(arr, 2))
productScoreCombinations = groupedCombinations.mapValues(lambda elems: ((elems[0][0], elems[1][0]), (elems[0][1], elems[1][1]))).map(lambda x: x[1])
當我運行上面的代碼,我在pyspark看到以下內容:
>>> input = sc.parallelize([("John", "a", "5"),("John", "b", "3"),("John", "c", "2"),("Mark", "a", "3"),("Mark", "b", "4"),("Lucy", "b", "2"),("Lucy", "c", "5")])
...
>>> productScoreCombinations.take(6)
...
[(('b', 'c'), ('2', '5')), (('a', 'b'), ('5', '3')), (('a', 'c'), ('5', '2')), (('b', 'c'), ('3', '2')), (('a', 'b'), ('3', '4'))]
我對「如果我輸入'a'」這個短語有點困惑。你能詳細說明一下還是發佈一些代碼? –
對不起,我感到困惑。這一點並不重要,我刪除了這一行, –
這還不清楚... – eliasah