2017-05-05 181 views
1

我想找到一種有效的方法來使用數據框在PySpark中創建備用矢量。稀疏矢量pyspark

比方說,考慮到交易輸入:

df = spark.createDataFrame([ 
    (0, "a"), 
    (1, "a"), 
    (1, "b"), 
    (1, "c"), 
    (2, "a"), 
    (2, "b"), 
    (2, "b"), 
    (2, "b"), 
    (2, "c"), 
    (0, "a"), 
    (1, "b"), 
    (1, "b"), 
    (2, "cc"), 
    (3, "a"), 
    (4, "a"), 
    (5, "c") 
], ["id", "category"]) 
+---+--------+ 
| id|category| 
+---+--------+ 
| 0|  a| 
| 1|  a| 
| 1|  b| 
| 1|  c| 
| 2|  a| 
| 2|  b| 
| 2|  b| 
| 2|  b| 
| 2|  c| 
| 0|  a| 
| 1|  b| 
| 1|  b| 
| 2|  cc| 
| 3|  a| 
| 4|  a| 
| 5|  c| 
+---+--------+ 

在總結格式:

df.groupBy(df["id"],df["category"]).count().show() 
+---+--------+-----+ 
| id|category|count| 
+---+--------+-----+ 
| 1|  b| 3| 
| 1|  a| 1| 
| 1|  c| 1| 
| 2|  cc| 1| 
| 2|  c| 1| 
| 2|  a| 1| 
| 1|  a| 1| 
| 0|  a| 2| 
+---+--------+-----+ 

我的目標是通過ID來獲得這個輸出:

+---+-----------------------------------------------+ 
| id|          feature | 
+---+-----------------------------------------------+ 
| 2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})| 

請您指點我正確的方向?在Java中使用mapreduce對我來說似乎更容易一些。

回答

4

這可以用pivotVectorAssembler很容易地來完成。其結果是,如下所示

from pyspark.ml.feature import VectorAssembler 

input_cols = [x for x in pivoted.columns if x != id] 

result = (VectorAssembler(inputCols=input_cols, outputCol="features") 
    .transform(pivoted) 
    .select("id", "features")) 

:與pivot替換聚合:

pivoted = df.groupBy("id").pivot("category").count().na.fill(0) 

和組裝。這將取決於稀疏選擇更高效的表示:

+---+---------------------+ 
|id |features    | 
+---+---------------------+ 
|0 |(5,[1],[2.0])  | 
|5 |(5,[0,3],[5.0,1.0]) | 
|1 |[1.0,1.0,3.0,1.0,0.0]| 
|3 |(5,[0,1],[3.0,1.0]) | 
|2 |[2.0,1.0,3.0,1.0,1.0]| 
|4 |(5,[0,1],[4.0,1.0]) | 
+---+---------------------+ 

當然你仍然可以將其轉換爲單一的表示:

from pyspark.ml.linalg import SparseVector, VectorUDT 
import numpy as np 

def to_sparse(c): 
    def to_sparse_(v): 
     if isinstance(v, SparseVector): 
      return v 
     vs = v.toArray() 
     nonzero = np.nonzero(vs)[0] 
     return SparseVector(v.size, nonzero, vs[nonzero]) 
    return udf(to_sparse_, VectorUDT())(c) 
+---+-------------------------------------+ 
|id |features        | 
+---+-------------------------------------+ 
|0 |(5,[1],[2.0])      | 
|5 |(5,[0,3],[5.0,1.0])     | 
|1 |(5,[0,1,2,3],[1.0,1.0,3.0,1.0])  | 
|3 |(5,[0,1],[3.0,1.0])     | 
|2 |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])| 
|4 |(5,[0,1],[4.0,1.0])     | 
+---+-------------------------------------+ 
2

如果您將數據框轉換爲RDD,則可以使用類似mapreduce的框架reduceByKey。這裏唯一的真正棘手的部分是格式化的日期爲火花的斯帕塞夫克託

導入包,創建數據

from pyspark.ml.feature import StringIndexer 
from pyspark.ml.linalg import Vectors 
df = sqlContext.createDataFrame([ 
    (0, "a"), 
    (1, "a"), 
    (1, "b"), 
    (1, "c"), 
    (2, "a"), 
    (2, "b"), 
    (2, "b"), 
    (2, "b"), 
    (2, "c"), 
    (0, "a"), 
    (1, "b"), 
    (1, "b"), 
    (2, "cc"), 
    (3, "a"), 
    (4, "a"), 
    (5, "c") 
], ["id", "category"]) 

創建類的數字表示(需要稀疏向量)

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
df = indexer.fit(df).transform(df) 

集團通過索引獲得計數

df = df.groupBy(df["id"],df["categoryIndex"]).count() 

轉換爲rdd,映射數據以ID &的鍵值對[categoryIndex,計數]

rdd = df.rdd.map(lambda x: (x.id, [(x.categoryIndex, x['count'])])) 

者皆減少獲得所有的ID &列表的鍵值對[categoryIndex,計數]該ID

rdd = rdd.reduceByKey(lambda a, b: a + b) 

地圖中的數據轉換成所有的名單[categoryIndex,計數]每個ID到稀疏矢量

rdd = rdd.map(lambda x: (x[0], Vectors.sparse(len(x[1]), x[1]))) 

重新轉換爲數據幀 finalDf = sqlContext.createDataFrame(RDD,[ 'ID', '特徵'])

數據檢查

finalDf.take(5) 

[Row(id=0, feature=SparseVector(1, {1: 2.0})), 
    Row(id=1, feature=SparseVector(3, {0: 3.0, 1: 1.0, 2: 1.0})), 
    Row(id=2, feature=SparseVector(4, {0: 3.0, 1: 1.0, 2: 1.0, 3: 1.0})), 
    Row(id=3, feature=SparseVector(1, {1: 1.0})), 
    Row(id=4, feature=SparseVector(1, {1: 1.0}))] 
相關問題