我不知道的輸出需要嚴格程度是在正是格式,但下面應該讓你幾乎所有的方式:
使用pyspark dataframes:
>>> employee = spark.createDataFrame([(31, ['Raffery', 31, 'a', 'b']), (33, ['Jones', 33, '1', 'b']), (32, ['Heisenberg', 33, 'a', 'b'])], ["id_e", "list_e"])
>>> employee.show()
+----+----------------------+
|id_e|list_e |
+----+----------------------+
|31 |[Raffery, 31, a, b] |
|33 |[Jones, 33, 1, b] |
|32 |[Heisenberg, 33, a, b]|
+----+----------------------+
>>> department = spark.createDataFrame([(31, ['Raffery', 31, 'c', 'b']), (33, ['Jones', 33, 'a', 'b']), (34, ['Heisenberg', 33, 'a', 'b'])], ["id_d", "list_d"])
>>> department.show()
+----+----------------------+
|id_d|list_d |
+----+----------------------+
|31 |[Raffery, 31, c, b] |
|33 |[Jones, 33, a, b] |
|34 |[Heisenberg, 33, a, b]|
+----+----------------------+
上加入這些,有什麼我假定,是用戶id:
>>> joined.rdd.map(lambda row: (row.id_e, [i for i in range(4) if row.list_d[i] != row.list_e[i]])).collect()
[(31, [2]), (33, [2])]
:
>>> joined = employee.join(department, employee.id_e == department.id_d)
>>> joined.show()
+----+-------------------+----+-------------------+
|id_e| list_e|id_d| list_d|
+----+-------------------+----+-------------------+
| 31|[Raffery, 31, a, b]| 31|[Raffery, 31, c, b]|
| 33| [Jones, 33, 1, b]| 33| [Jones, 33, a, b]|
+----+-------------------+----+-------------------+
然後未dataframes之間共享的元素的用戶列表的索引映射
希望你能在你的路上,祝你好運。