2016-06-09 54 views
0

我有一個Spark DataFrame,我試圖根據以前的列創建一個新的列,但是對我而言,困難的部分是我已經按行計算了列的值。例如:在PySpark中添加基於行的操作的列

COL1 | COL2 | COL3

1 | 2 | 3

4 | 5 | 0

3 | 1 | 1

所以,我想其中有每行表達
MAX(COL1,COL2,COL3)的列名新列。因此,所需的輸出:

COL1 | COL2 | COL3 | COL4

1 | 2 | 3 | 'col3'

4 | 5 | 0 | 'col2'

3 | 1 | 1 | 'col1'

無論如何,它有可能在PySpark中做?

+0

對於您發佈的數據,您希望的輸出是什麼樣的? –

+0

我已更新問題以反映所需的輸出。 – Hemant

+0

如果有領帶怎麼辦?如果兩個數字是相同的呢? –

回答

0

這不是一個理想的答案,因爲它會迫使你回到RDD。如果我找到一個更好的,允許你留在DataFrame宇宙,我會更新我的答案。但現在這應該起作用。

a = sc.parallelize([[1,2,3],[4,5,0],[3,1,1]]) 
headers = ["col1", "col2", "col3"] 

b = a.map(lambda x: (x[0], x[1], x[2], headers[x.index(max(x))])) 

b.toDF(headers.append("max_col")).show() 

這基本上可以讓你通過你的RDD基於行迭代使用max操作從蟒蛇。然後它通過索引標題列表來找到正確的列。

再次,我不確定這是最好的方法,我希望能找到更好的方法。