2017-06-15 148 views
2

我有兩個獨立的DataFrames每個有我使用mllib變壓器在管道處理幾個不同的處理階段。連接兩個星火mllib管道一起

我現在想這兩個管道連接在一起,從每個DataFrame保持功能(列)。

Scikit學習有FeatureUnion類處理這個,我似乎無法找到mllib的等價物。

我可以在一個流水線取得由其他管道作爲屬性產生的數據幀的末尾添加一個自定義的變壓器級,並在變換方法加入,但似乎凌亂。

+0

它是你正在尋找的聯合或聯盟嗎?兩者都可以使用數據框來處理。 – jamborta

+0

@jamborta它是一個連接,但是,我想把它作爲一個流水線階段,以便我可以在整個流水線上進行模式檢查 – Anake

回答

3

PipelinePipelineModel是有效PipelineStages,因此可以在單個Pipeline組合。例如有:

from pyspark.ml import Pipeline 
from pyspark.ml.feature import VectorAssembler 

df = spark.createDataFrame([ 
    (1.0, 0, 1, 1, 0), 
    (0.0, 1, 0, 0, 1) 
], ("label", "x1", "x2", "x3", "x4")) 

pipeline1 = Pipeline(stages=[ 
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1") 
]) 

pipeline2 = Pipeline(stages=[ 
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2") 
]) 

你可以結合Pipelines

Pipeline(stages=[ 
    pipeline1, pipeline2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features") 
]).fit(df).transform(df) 
+-----+---+---+---+---+---------+---------+-----------------+ 
|label|x1 |x2 |x3 |x4 |features1|features2|features   | 
+-----+---+---+---+---+---------+---------+-----------------+ 
|1.0 |0 |1 |1 |0 |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]| 
|0.0 |1 |0 |0 |1 |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]| 
+-----+---+---+---+---+---------+---------+-----------------+ 

或預安裝PipelineModels

model1 = pipeline1.fit(df) 
model2 = pipeline2.fit(df) 

Pipeline(stages=[ 
    model1, model2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features") 
]).fit(df).transform(df) 
+-----+---+---+---+---+---------+---------+-----------------+ 
|label| x1| x2| x3| x4|features1|features2|   features| 
+-----+---+---+---+---+---------+---------+-----------------+ 
| 1.0| 0| 1| 1| 0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]| 
| 0.0| 1| 0| 0| 1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]| 
+-----+---+---+---+---+---------+---------+-----------------+ 

因此,我建議的方法是加入數據ehand,並且fittransform整個DataFrame