2017-06-08 69 views
1

我想在Java中創建一個自定義Spark Transformer。在Java中創建一個自定義Transformer spark ml

Transformer是文本預處理器,它像Tokenizer一樣工作。它將輸入列和輸出列作爲參數。

我環顧四周,發現了2個Scala Traits HasInputCol和HasOutputCol。

如何創建一個擴展Transformer並實現HasInputCol和OutputCol的類?

我的目標是這樣的。

// Dataset that have a String column named "text" 
    DataSet<Row> dataset; 

    CustomTransformer customTransformer = new CustomTransformer(); 
    customTransformer.setInputCol("text"); 
    customTransformer.setOutputCol("result"); 

    // result that have 2 String columns named "text" and "result" 
    DataSet<Row> result = customTransformer.transform(dataset); 

回答

1

您可能想要從org.apache.spark.ml.UnaryTransformer繼承CustomTransformer。您可以嘗試這樣的事:

import org.apache.spark.ml.UnaryTransformer; 
import org.apache.spark.ml.util.Identifiable$; 
import org.apache.spark.sql.types.DataType; 
import org.apache.spark.sql.types.DataTypes; 
import scala.Function1; 
import scala.collection.JavaConversions$; 
import scala.collection.immutable.Seq; 

import java.util.Arrays; 

public class MyCustomTransformer extends UnaryTransformer<String, scala.collection.immutable.Seq<String>, MyCustomTransformer> 
{ 
    private final String uid = Identifiable$.MODULE$.randomUID("mycustom"); 

    @Override 
    public String uid() 
    { 
     return uid; 
    } 


    @Override 
    public Function1<String, scala.collection.immutable.Seq<String>> createTransformFunc() 
    { 
     // can't use labmda syntax :(
     return new scala.runtime.AbstractFunction1<String, Seq<String>>() 
     { 
      @Override 
      public Seq<String> apply(String s) 
      { 
       // do the logic 
       String[] split = s.toLowerCase().split("\\s"); 
       // convert to Scala type 
       return JavaConversions$.MODULE$.iterableAsScalaIterable(Arrays.asList(split)).toList(); 
      } 
     }; 
    } 


    @Override 
    public void validateInputType(DataType inputType) 
    { 
     super.validateInputType(inputType); 
     if (inputType != DataTypes.StringType) 
      throw new IllegalArgumentException("Input type must be string type but got " + inputType + "."); 
    } 

    @Override 
    public DataType outputDataType() 
    { 
     return DataTypes.createArrayType(DataTypes.StringType, true); // or false? depends on your data 
    } 
} 
+0

這不起作用。我想這是因爲一個錯誤。我得到'''java.lang.IllegalArgumentException:需求失敗:參數d7ac3108-799c-4aed-a093-c85d12833a4e__inputCol不屬於fe3d99ba-e4eb-4e95-9412-f84188d936e3.''' – LonsomeHell

+0

@LonsomeHell,只是爲了仔細檢查,你確定你配置了一個有效的輸入列嗎? – SergGr

+0

是的,我用setInput和一個有效的列名。 – LonsomeHell

0

作爲SergGr建議,可以延長UnaryTransformer。然而這非常棘手。

注意:以下所有註釋均適用於Spark版本2.2.0。

爲了解決SPARK-12606,在那裏他們得到"...Param null__inputCol does not belong to..."描述的問題,您應該實現String uid()這樣的:

@Override 
public String uid() { 
    return getUid(); 
} 

private String getUid() { 

    if (uid == null) { 
     uid = Identifiable$.MODULE$.randomUID("mycustom"); 
    } 
    return uid; 
} 

在構造顯然他們初始化UID。但事情是在繼承類中初始化uid之前,UnaryTransformer的inputCol(和outputCol)被初始化。見HasInputCol

final val inputCol: Param[String] = new Param[String](this, "inputCol", "input column name") 

這是Param是如何構建的:

def this(parent: Identifiable, name: String, doc: String) = this(parent.uid, name, doc) 

因此,當parent.uid評估,定製uid()實現被稱爲在這一點上uid仍然是空。通過執行uid()和惰性評估,確保uid()永不返回空值。

你的情況,但:

Param d7ac3108-799c-4aed-a093-c85d12833a4e__inputCol does not belong to fe3d99ba-e4eb-4e95-9412-f84188d936e3 

這似乎有點不同。由於"d7ac3108-799c-4aed-a093-c85d12833a4e" != "fe3d99ba-e4eb-4e95-9412-f84188d936e3",它看起來像你的實施uid()方法返回每個調用的新值。也許在你的情況下,它是實現了它這樣:延長UnaryTransformer

@Override 
public String uid() { 
    return Identifiable$.MODULE$.randomUID("mycustom"); 
} 

順便說一句,確保變換函數是Serializable