2016-12-02 58 views
-3

我有兩個rdd。對於例如比較pyspark中的兩個rdds的每個值

employee = [(31, ['Raffery', 31, 'a', 'b']), 
       (33, ['Jones', 33, '1', 'b']), 
       (32, ['Heisenberg', 33, 'a', 'b']), 
       (37, ['Robinson', 34, 'c', 'cc']), 
       (38, ['Smith', 34, 'a', 'b'])]` 

department = [[(31, ['Raffery', 31, 'c', 'b']), 
       (33, ['Jones', 33, 'a', 'b']), 
       (34, ['Heisenberg', 33, 'a', 'b'])]` 

欲第一RDD的元素與所述第二個用於每個鍵比較:

輸出應看起來像

31和故障是在電子[1 ] [2]

33和故障是在電子[1] [2]

回答

1

我不知道的輸出需要嚴格程度是在正是格式,但下面應該讓你幾乎所有的方式:

使用pyspark dataframes:

>>> employee = spark.createDataFrame([(31, ['Raffery', 31, 'a', 'b']), (33, ['Jones', 33, '1', 'b']), (32, ['Heisenberg', 33, 'a', 'b'])], ["id_e", "list_e"]) 
>>> employee.show() 
+----+----------------------+ 
|id_e|list_e    | 
+----+----------------------+ 
|31 |[Raffery, 31, a, b] | 
|33 |[Jones, 33, 1, b]  | 
|32 |[Heisenberg, 33, a, b]| 
+----+----------------------+ 

>>> department = spark.createDataFrame([(31, ['Raffery', 31, 'c', 'b']), (33, ['Jones', 33, 'a', 'b']), (34, ['Heisenberg', 33, 'a', 'b'])], ["id_d", "list_d"]) 
>>> department.show() 
+----+----------------------+ 
|id_d|list_d    | 
+----+----------------------+ 
|31 |[Raffery, 31, c, b] | 
|33 |[Jones, 33, a, b]  | 
|34 |[Heisenberg, 33, a, b]| 
+----+----------------------+ 

上加入這些,有什麼我假定,是用戶id:

>>> joined.rdd.map(lambda row: (row.id_e, [i for i in range(4) if row.list_d[i] != row.list_e[i]])).collect() 
[(31, [2]), (33, [2])] 

>>> joined = employee.join(department, employee.id_e == department.id_d) 
>>> joined.show() 
+----+-------------------+----+-------------------+ 
|id_e|    list_e|id_d|    list_d| 
+----+-------------------+----+-------------------+ 
| 31|[Raffery, 31, a, b]| 31|[Raffery, 31, c, b]| 
| 33| [Jones, 33, 1, b]| 33| [Jones, 33, a, b]| 
+----+-------------------+----+-------------------+ 

然後未dataframes之間共享的元素的用戶列表的索引映射

希望你能在你的路上,祝你好運。