2016-05-04 132 views
3

我有一個DataFrame兩列:pyspark:結合數據幀的行成DenseVector

df = sqlContext.createDataFrame([ 
    (1, 'a'), (2, 'a'), 
    (3, 'b'), (4, 'b'), 
    (5, 'c'), (6, 'c'), 
    (7, 'd'), (8, 'd'), 
], schema=['value', 'name']) 

編輯2017年1月13日: 我從基於實體 - 屬性 - 值SQL表得出這個數據幀模型。因此,每一行都會有一個額外的第三個實體列「id」。

我想根據ml包的分類器的要求將其轉換爲「特徵」DataFrame。對於單個列這可以通過使用可以實現VectorAssembler

from pyspark.ml.feature import VectorAssembler 

assembler = VectorAssembler(inputCols=['value'], outputCol="features") 
selected_features = assembler.transform(df).select('features') 
selected_features.collect() 

[Row(features=DenseVector([1.0])), 
Row(features=DenseVector([2.0])), 
Row(features=DenseVector([3.0])), 
Row(features=DenseVector([4.0])), 
Row(features=DenseVector([5.0])), 
Row(features=DenseVector([6.0])), 
Row(features=DenseVector([7.0])), 
Row(features=DenseVector([8.0]))] 

我想是這樣的:

[Row(features=DenseVector([1.0, 2.0])), 
Row(features=DenseVector([3.0, 4.0])), 
Row(features=DenseVector([5.0, 6.0])), 
Row(features=DenseVector([7.0, 8.0]))] 

什麼是最有效的方式來value列的值組合成基於一個DenseVector列的值爲name

我在想例如一個自定義的聚合函數爲GroupedData,將與groupby工作:

df.groupby('name').vector_agg().collect() 

類似的PostgreSQL array_agg功能:

SELECT array_agg(df.value) FROM table as df 
GROUP BY df.name; 

回答

1

我認爲你的問題是不明確的,因爲對於一個固定的name沒有辦法知道哪些value屬於哪一列。 ml包中的分類器將全部要求每個列在訓練樣本之間一致地使用。在你的例子中,列正好按照所需順序提供,但實際上你不能依賴這個。

你的問題是可以解決的,如果你可以給你的功能指標,並與像這樣開始:由name

df = sc.sql.createDataFrame([ 
    ('a', ('f1', 1)), ('a', ('f2', 2)), 
    ('b', ('f1', 3)), ('b', ('f2', 4)), 
    ('c', ('f1', 5)), ('c', ('f2', 6)), 
    ('d', ('f1', 7)), ('d', ('f2', 8)), 
], schema=['name', 'feature']) 

首先,組和收集您的特點列表:

import pyspark.sql.functions as F 

df.groupBy('name')\ 
    .agg(F.collect_list('feature'))\ 
    .show() 

輸出:

+----+---------------------+ 
|name|collect_list(feature)| 
+----+---------------------+ 
| d|  [[f1,7], [f2,8]]| 
| c|  [[f1,5], [f2,6]]| 
| b|  [[f1,3], [f2,4]]| 
| a|  [[f1,1], [f2,2]]| 
+----+---------------------+ 

接下來,使用udf在withColumn將此數組轉換爲DenseVector。全部放在一起:

from pyspark.ml.linalg import Vectors, VectorUDT 
import pyspark.sql.functions as F 

list_to_dense = F.udf(lambda l: Vectors.dense([v for (k,v) in sorted(l)]), VectorUDT()) 

df.groupBy('name')\ 
    .agg(F.collect_list('features'))\ 
    .withColumn('features', list_to_dense('collect_list(features)'))\ 
    .select('features')\ 
    .collect() 

輸出:

[Row(features=DenseVector([7.0, 8.0])), 
Row(features=DenseVector([5.0, 6.0])), 
Row(features=DenseVector([3.0, 4.0])), 
Row(features=DenseVector([1.0, 2.0]))] 
+0

你是對的!該問題與基於實體 - 屬性 - 值模型處理SQL表有關。所以會有第三個實體專欄。我會相應地更新這個問題。 – mdh

+0

'collect_list'就是我正在尋找的東西 – mdh

1

從你只需要你的數據結構用同一個表格做joinfilter這些行的values是相同的(或反轉的)。

df = sqlContext.createDataFrame([ 
    (1, 'a'), (2, 'a'), 
    (3, 'b'), (4, 'b'), 
    (5, 'c'), (6, 'c'), 
    (7, 'd'), (8, 'd'), 
], schema=['value', 'name']) 

xf = df.select(df["name"].alias("nam"), df["value"].alias("val")) 
pf = df.join(xf, df["name"] == xf["nam"], "inner").where(xf["val"] < df["value"]).select(df["value"], xf["val"], df["name"]) 

from pyspark.ml.feature import VectorAssembler 


assembler = VectorAssembler(inputCols=['value', "val"], outputCol="features") 
selected_features = assembler.transform(pf).select('features') 
selected_features.collect() 


#[Row(features=DenseVector([2.0, 1.0])), 
# Row(features=DenseVector([4.0, 3.0])), 
# Row(features=DenseVector([6.0, 5.0])), 
# Row(features=DenseVector([8.0, 7.0]))] 
+0

這工作得很好,因爲我提供的很簡單的例子,但是我正在尋找一個更通用的方法。我提出了我的問題。 – mdh