2015-10-18 145 views
2

我有一個鍵值對RDD其中的關鍵是演員和值的影片,這個演員參與兩個鍵,形式對RDD:PySpark - 創建與共享相同的值

["actor 1", "movie 1"] 
["actor 1", "movie 2"] 
["actor 1", "movie 3"] 
... 
["actor n", "movie 2"] 

我想將它映射到另一個鍵值對RDD,其中每對由兩個參與普通電影的演員組成。

在上例中,這意味着新的RDD將包含["actor 1", "actor n"]對,因爲它們都參與"movie 2"

回答

2

一個簡單的交換和加入應該做的伎倆。首先,讓我們創建一些虛擬的數據和一個小的輔助函數:

actor_movie = sc.parallelize([ 
    ("actor 1", "movie 1"), 
    ("actor 1", "movie 3"), 
    ("actor 1", "movie 3"), 
    ("actor n", "movie 2") 
]) 

swap = lambda x: (x[1], x[0]) 

接下來你交換順序:

movie_actor = (actor_movie.map(swap) 
    .partitionBy(actor_movie.getNumPartitions()) 
    .cache()) 

並加入:

(movie_actor 
    .join(movie_actor) # Join by movie 
    .values() # Extract values (actors) 
    .filter(lambda x: x[0] != x[1])) 
+0

正是我需要的,謝謝! – nikos

1

這不完全是你要求什麼,但我認爲這已經足夠了:

import itertools as iter 

movies = sc.parallelize([("P", "SW4"), ("P", "SW5"), ("P", "SW6"), 
         ("A", "SW4"), ("A", "SW5"), 
         ("B", "SW5"), ("B", "SW6"), 
         ("W", "SW4"), 
         ("X", "SW1"), ("X", "SW7"), ("X", "SW2"), ("X", "SW3"), 
         ("Y", "SW1"), ("Y", "SW7"), ("Y", "SW2"), ("Y", "SW3")]) 

swap_tuple = lambda (k, v): (v, k) 

movies = movies.groupByKey().mapValues(list) 

all_pairs = movies.flatMap(lambda (movie, actors): map(lambda actors:(movie, actors), iter.combinations(actors, 2))) 

print all_pairs.collect() 

""" 
    >> [('SW1', ('X', 'Y')), 
     ('SW3', ('X', 'Y')), 
     ('SW5', ('P', 'A')), 
     ('SW5', ('P', 'B')), 
     ('SW5', ('A', 'B')), 
     ('SW7', ('X', 'Y')), 
     ('SW2', ('X', 'Y')), 
     ('SW4', ('P', 'A')), 
     ('SW4', ('P', 'W')), 
     ('SW4', ('A', 'W')), 
     ('SW6', ('P', 'B'))] 
""" 

Here正在運行使用.ipynb