2

我們創建了兩個數據集語句DataFrame,sentenceDataFrame2,其中應該發生搜索替換。在Apache Spark中搜索並替換

sentenceDataFrame2存儲搜索和替換條款。我們還執行了所有11種類型的連接'inner','outer','full','fullouter','leftouter','left','rightouter','right','leftsemi','leftanti ','十字'他們沒有給我們結果。

您能否讓我們知道我們要去的地方錯誤和善意地指引我們走向正確的方向。

 List<Row> data = Arrays.asList(
      RowFactory.create(0, "Allen jeevi pramod Allen"), 
      RowFactory.create(1,"sandesh Armstrong jeevi"), 
      RowFactory.create(2,"harsha Nischay DeWALT")); 

     StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, 
      Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, 
      Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 


     List<Row> data2 = Arrays.asList(
      RowFactory.create("Allen", "Apex Tool Group"), 
      RowFactory.create("Armstrong","Apex Tool Group"), 
      RowFactory.create("DeWALT","StanleyBlack")); 

     StructType schema2 = new StructType(new StructField[] { 
     new StructField("label2", DataTypes.StringType, false, 
      Metadata.empty()), 
     new StructField("sentence2", DataTypes.StringType, false, 
      Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

     Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross"); 
     System.out.println("Left anti join count :"+remainingElements.count()); 

輸入

阿倫jeevi普拉莫德·艾倫
sandesh阿姆斯特朗jeevi
戒Nischay得偉

期望輸出

的Apex工具集團jeevi普拉莫德的Apex工具集團
sandesh頂點工具集團jeevi
戒Nischay StanleyBlack

回答

3

對於連接不涉及簡單的等式這樣的條件,你將需要使用星火用戶定義函數(UDF)。

下面是一個JUnit代碼片斷,它不會直接編譯,但會顯示相關的導入和邏輯。不過,Java API非常冗長。我將把Scala中的這個問題作爲讀者的練習。它會更簡潔。

對於callUDF()col()方法,靜態導入是必需的。

import static org.apache.spark.sql.functions.*; 

import org.apache.spark.sql.*; 
import org.apache.spark.sql.api.java.UDF2; 
import org.apache.spark.sql.api.java.UDF3; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.Metadata; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

@Test 
public void testSomething() { 
    List<Row> data = Arrays.asList(
     RowFactory.create(0, "Allen jeevi pramod Allen"), 
     RowFactory.create(1, "sandesh Armstrong jeevi"), 
     RowFactory.create(2, "harsha Nischay DeWALT") 
    ); 

    StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
    }); 
    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 

    List<Row> data2 = Arrays.asList(
     RowFactory.create("Allen", "Apex Tool Group"), 
     RowFactory.create("Armstrong","Apex Tool Group"), 
     RowFactory.create("DeWALT","StanleyBlack") 
    ); 

    StructType schema2 = new StructType(new StructField[] { 
     new StructField("label2", DataTypes.StringType, false, Metadata.empty()), 
     new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
    }); 
    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

    UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() { 
     private static final long serialVersionUID = -5239951370238629896L; 

     @Override 
     public Boolean call(String t1, String t2) throws Exception { 
      return t1.contains(t2); 
     } 
    }; 
    spark.udf().register("contains", contains, DataTypes.BooleanType); 

    UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() { 
     private static final long serialVersionUID = -2882956931420910207L; 

     @Override 
     public String call(String t1, String t2, String t3) throws Exception { 
      return t1.replaceAll(t2, t3); 
     } 
    }; 
    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType); 

    Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"))) 
              .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) 
              .select(col("sentence_replaced")); 

    joined.show(false); 
} 

輸出:

+--------------------------------------------+ 
|sentence_replaced       | 
+--------------------------------------------+ 
|Apex Tool Group jeevi pramod Apex Tool Group| 
|sandesh Apex Tool Group jeevi    | 
|harsha Nischay StanleyBlack     | 
+--------------------------------------------+ 
+0

謝謝@Ivan Gozali 它完美地工作。 – Nischay

2

類似的問題仍然面臨

輸入

阿倫阿姆斯特朗jeevi普拉莫德阿倫
sandesh阿姆斯特朗jeevi
戒nischay得偉

輸出

的Apex工具集團阿姆斯特朗jeevi普拉莫德的Apex工具集團
艾倫的Apex工具集團jeevi普拉莫德·艾倫
sandesh頂點工具集團jeevi
戒nischay StanleyBlack

的預期輸出

頂點工具集團的Apex工具集團jeevi普拉莫德的Apex工具集團
sandesh頂點工具集團jeevi
戒nischay StanleyBlack

得到這個輸出,當有多個替換成一排做。

是否有任何其他方法必須遵循以獲得正確的輸出。或者這是UDF的限制嗎?

5

我們可以使用replaceAll和UDF功能來實現預期的輸出。

public class Test { 

    public static void main(String[] args) { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
     SQLContext sqlContext = new SQLContext(sc); 
     SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate(); 

     List<Row> data = Arrays.asList(
     RowFactory.create(0, "Allen jeevi pramod Allen"), 
     RowFactory.create(1, "sandesh Armstrong jeevi"), 
     RowFactory.create(2, "harsha Nischay DeWALT") 
    ); 

     StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, 
       Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, 
       Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 
     UDF1 mode = new UDF1<String, String>() { 
      public String call(final String types) throws Exception { 
       return types.replaceAll("Allen", "Apex Tool Group") 
       .replaceAll("Armstrong","Apex Tool Group") 
       .replaceAll(""DeWALT","StanleyBlack"") 
      } 
     }; 

     sqlContext.udf().register("mode", mode, DataTypes.StringType); 

     sentenceDataFrame.createOrReplaceTempView("people"); 
     Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence"); 
     newDF.show(false); 
} 
} 

輸出

+--------------------------------------------+------+ 
    |sentence         |label | 
    +--------------------------------------------+------+ 
    |Apex Tool Group jeevi pramod Apex Tool Group| 0 | 
    |sandesh Apex Tool Group jeevi    | 1 | 
    |harsha Nischay StanleyBlack     | 2 | 
    +--------------------------------------------+------+