2017-10-13 49 views
1

我有2個數據框,每個數組都有Array [String]作爲其中一列。對於一個數據框中的每個條目,我需要找出其他數據框中的子集(如果有的話)。一個例子是在這裏:Apache Spark - 查找數組/列表/子集

DF1:

---------------------------------------------------- 
      id : Long | labels : Array[String] 
--------------------------------------------------- 
     10    | [label1, label2, label3] 
     11    | [label4, label5] 
     12    | [label6, label7] 

DF2:

---------------------------------------------------- 
     item : String | labels : Array[String] 
--------------------------------------------------- 
     item1   | [label1, label2, label3, label4, label5] 
     item2   | [label4, label5] 
     item3   | [label4, label5, label6, label7] 

我描述的子集操作之後,預計O/p應該

DF3:

---------------------------------------------------- 
     item : String | id : Long 
--------------------------------------------------- 
     item1   | [10, 11] 
     item2   | [11] 
     item3   | [11, 12] 

它是g保證DF2在DF1中總是有相應的子集,所以不會有任何剩餘的元素。

有人可以請幫助正確的方法嗎?它看起來像DF2中的每個元素,我需要掃描DF1並在第二列上進行子集操作(或設置減法),直到找到所有子集並耗盡該行中的標籤,並在此過程中累積「id 「領域。我如何以緊湊和高效的方式來做到這一點?任何幫助是極大的讚賞。實際上,我可能在DF1中有100個元素,在DF2中有1000個元素。

回答

0

我不知道有什麼辦法以有效的方式執行這種操作。但是,下面是使用UDF以及笛卡爾連接的一種可能的解決方案。

UDF需要兩個序列,並檢查是否在所述第二存在於第一所有字符串:

val matchLabel = udf((array1: Seq[String], array2: Seq[String]) => { 
    array1.forall{x => array2.contains(x)} 
}) 

要使用笛卡爾加入,它需要被使能,因爲它是計算昂貴的。

val spark = SparkSession.builder.getOrCreate() 
spark.conf.set("spark.sql.crossJoin.enabled", true) 

使用UDF將兩個數據幀連接在一起。之後,生成的數據框按item列分組以收集所有ID的列表。使用相同的DF1DF2如問題:

val DF3 = DF2.join(DF1, matchLabel(DF1("labels"), DF2("labels"))) 
    .groupBy("item") 
    .agg(collect_list("id").as("id")) 

結果如下:

+-----+--------+ 
| item|  id| 
+-----+--------+ 
|item3|[11, 12]| 
|item2| [11]| 
|item1|[10, 11]| 
+-----+--------+ 
+1

感謝您的解決方案..它的工作就像一個魅力。如預期的那樣,不是最優的,但是在功能上用於驗證我感興趣的算法 –