設置數據幀:減少行元組的陣列,以他們的唯一值與PySpark
data = [
(Row(id='label1'), Row(id='label1')),
(Row(id='label1'), Row(id='label6')),
(Row(id='label6'), Row(id='label1')),
(Row(id='label6'), Row(id='label6'))]
df = sc.parallelize(data)
df.collect()
回報:
[(Row(id='label1'), Row(id='label1')),
(Row(id='label1'), Row(id='label6')),
(Row(id='label6'), Row(id='label1')),
(Row(id='label6'), Row(id='label6'))]
這個數據集被降低,實際的數據集包含更多的標籤。我試圖刪除重複項。要刪除重複項:
[(Row(id='label1'), Row(id='label1')),
(Row(id='label1'), Row(id='label6')),
(Row(id='label6'), Row(id='label1')),
(Row(id='label6'), Row(id='label6'))]
以下應返回:
在其他位置的元組[(Row(id='label1'), Row(id='label6'))]
所以含有相同標籤的行被刪除或相同的標籤被刪除。
最近我有得到的是:
df.map(lambda k : k[0]).toDF().distinct().collect()
返回:
[Row(id='label6'), Row(id='label1')]
但是這行的,而不是排元組數組的數組:[(Row(id='label6'), Row(id='label1'))]
更新:
每行還包含一個SparseVector,我試圖省略關注問題。對每個行的執行比較功能SparseVector
行label1,label1
和label6,label6
不會進行比較,因爲它們包含相同的標籤。 label1,label6
或label6,label1
不作爲比較label1,label6和label6,label1將返回相同的值。
更新2:
data = [
(Row(id='label1'), Row(id='label1')),
(Row(id='label1'), Row(id='label6')),
(Row(id='label6'), Row(id='label1')),
(Row(id='label6'), Row(id='label6'))
(Row(id='label6'), Row(id='label2'))
(Row(id='label3'), Row(id='label3'))]
應該返回
data = [
(Row(id='label1'), Row(id='label6'))
(Row(id='label6'), Row(id='label2'))]
更新3:
from pyspark.sql import Row
data = [(Row(id='label1'), Row(id='label1')),
(Row(id='label1'), Row(id='label6')),
(Row(id='label6'), Row(id='label1')),
(Row(id='label6'), Row(id='label6')),
(Row(id='label6'), Row(id='label2')),
(Row(id='label3'), Row(id='label3'))]
df = sc.parallelize(data)
df.groupByKey()\
.flatMap(lambda (x,y): fn(x,y))\
.map(lambda r: tuple(sorted(r)))\
.distinct()\
.take(5)
[(Row(id='label2'), Row(id='label6')),
(Row(id='label1'), Row(id='label6'))]
回報:
File "<ipython-input-39-dffaf6bcbb80>", line 12
df.groupByKey() .flatMap(lambda (x,y): fn(x,y)) .map(lambda r: tuple(sorted(r))) .distinct() .take(5)
^
SyntaxError: invalid syntax
更新4:
from pyspark.sql import Row
data = [(Row(id='label1'), Row(id='label1')),
(Row(id='label1'), Row(id='label6')),
(Row(id='label6'), Row(id='label1')),
(Row(id='label6'), Row(id='label6')),
(Row(id='label6'), Row(id='label2')),
(Row(id='label3'), Row(id='label3'))]
df = sc.parallelize(data)
df.groupByKey().flatMap(lambda (x,y): fn(x,y)).map(lambda r: tuple(sorted(r))).distinct().take(5)
回報:
File "<ipython-input-302-dd76d08c9530>", line 12
df.groupByKey().flatMap(lambda (x,y): fn(x,y)).map(lambda r: tuple(sorted(r))).distinct().take(5)
^
SyntaxError: invalid syntax
@ blue-sky,你爲什麼放棄這個記錄?(行(id ='label3'),行(id ='label3'),label3沒有出現在任何地方? – Rags
@Rags我放棄記錄,因爲它是在兩個元組位置相同的id。 –