2017-04-05 17 views
1

設置數據幀:減少行元組的陣列,以他們的唯一值與PySpark

data = [ 
(Row(id='label1'), Row(id='label1')), 
(Row(id='label1'), Row(id='label6')), 
(Row(id='label6'), Row(id='label1')), 
(Row(id='label6'), Row(id='label6'))] 

df = sc.parallelize(data) 

df.collect() 

回報:

[(Row(id='label1'), Row(id='label1')), 
(Row(id='label1'), Row(id='label6')), 
(Row(id='label6'), Row(id='label1')), 
(Row(id='label6'), Row(id='label6'))] 

這個數據集被降低,實際的數據集包含更多的標籤。我試圖刪除重複項。要刪除重複項:

[(Row(id='label1'), Row(id='label1')), 
(Row(id='label1'), Row(id='label6')), 
(Row(id='label6'), Row(id='label1')), 
(Row(id='label6'), Row(id='label6'))] 

以下應返回:

在其他位置的元組
[(Row(id='label1'), Row(id='label6'))] 

所以含有相同標籤的行被刪除或相同的標籤被刪除。

最近我有得到的是:

df.map(lambda k : k[0]).toDF().distinct().collect() 

返回:

[Row(id='label6'), Row(id='label1')] 

但是這行的,而不是排元組數組的數組:[(Row(id='label6'), Row(id='label1'))]

更新:

每行還包含一個SparseVector,我試圖省略關注問題。對每個行的執行比較功能SparseVectorlabel1,label1label6,label6不會進行比較,因爲它們包含相同的標籤。 label1,label6label6,label1不作爲比較label1,label6和label6,label1將返回相同的值。

更新2:

data = [ 
(Row(id='label1'), Row(id='label1')), 
(Row(id='label1'), Row(id='label6')), 
(Row(id='label6'), Row(id='label1')), 
(Row(id='label6'), Row(id='label6')) 
(Row(id='label6'), Row(id='label2')) 
(Row(id='label3'), Row(id='label3'))] 

應該返回

data = [ 
(Row(id='label1'), Row(id='label6')) 
(Row(id='label6'), Row(id='label2'))] 

更新3:

from pyspark.sql import Row 

data = [(Row(id='label1'), Row(id='label1')), 
     (Row(id='label1'), Row(id='label6')), 
     (Row(id='label6'), Row(id='label1')), 
     (Row(id='label6'), Row(id='label6')), 
     (Row(id='label6'), Row(id='label2')), 
     (Row(id='label3'), Row(id='label3'))] 

df = sc.parallelize(data) 

df.groupByKey()\ 
    .flatMap(lambda (x,y): fn(x,y))\ 
    .map(lambda r: tuple(sorted(r)))\ 
    .distinct()\ 
    .take(5) 

[(Row(id='label2'), Row(id='label6')), 
(Row(id='label1'), Row(id='label6'))] 

回報:

File "<ipython-input-39-dffaf6bcbb80>", line 12 
    df.groupByKey() .flatMap(lambda (x,y): fn(x,y)) .map(lambda r: tuple(sorted(r))) .distinct() .take(5) 
            ^
SyntaxError: invalid syntax 

更新4:

from pyspark.sql import Row 

data = [(Row(id='label1'), Row(id='label1')), 
     (Row(id='label1'), Row(id='label6')), 
     (Row(id='label6'), Row(id='label1')), 
     (Row(id='label6'), Row(id='label6')), 
     (Row(id='label6'), Row(id='label2')), 
     (Row(id='label3'), Row(id='label3'))] 

df = sc.parallelize(data) 

df.groupByKey().flatMap(lambda (x,y): fn(x,y)).map(lambda r: tuple(sorted(r))).distinct().take(5) 

回報:

File "<ipython-input-302-dd76d08c9530>", line 12 
    df.groupByKey().flatMap(lambda (x,y): fn(x,y)).map(lambda r: tuple(sorted(r))).distinct().take(5) 
           ^
SyntaxError: invalid syntax 
+0

@ blue-sky,你爲什麼放棄這個記錄?(行(id ='label3'),行(id ='label3'),label3沒有出現在任何地方? – Rags

+0

@Rags我放棄記錄,因爲它是在兩個元組位置相同的id。 –

回答

0

請看看這對你的作品

這是您的數據

data = [(Row(id='label1'), Row(id='label1')), 
     (Row(id='label1'), Row(id='label6')), 
     (Row(id='label6'), Row(id='label1')), 
     (Row(id='label6'), Row(id='label6')), 
     (Row(id='label6'), Row(id='label2')), 
     (Row(id='label3'), Row(id='label3'))] 

df = sc.parallelize(data) 

下面會給你想要的東西:

def fn(k, rl): 
    fil=[] 
    for row in rl: 
     if row != k: 
      fil.append((k, row)) 
    return fil 

df.groupByKey()\ 
    .flatMap(lambda (x,y): fn(x,y))\ 
    .map(lambda r: tuple(sorted(r)))\ 
    .distinct()\ 
    .take(5) 

[(Row(id='label2'), Row(id='label6')), 
(Row(id='label1'), Row(id='label6'))] 

希望這會有幫助

+0

謝謝,但是失敗,請參閱問題更新。Python3是否適用於您? –

+1

對不起,忘了添加函數定義,請現在檢查。同樣可以通過一個簡單的方法實現'df.filter(lambda x:x [0]!= x [1])。map(sorted).map(tuple).distinct()。take(5)'like user6910411 mentioned – Rags

+0

你改變了什麼?我嘗試了你的更新代碼,但是出現錯誤,請參閱更新4。它確實出現'df.filter(lambda x:x [0]!= x [1]).map(sorted).map(tuple).distinct()。take(5)'也可以工作 –