2016-11-26 110 views
4

我有兩個DataFrame在我的火花(V1.5.0)代碼的交集大小連接兩個星火據幀:根據兩個陣列列

aDF = [user_id : Int, user_purchases: array<int> ] 
bDF = [user_id : Int, user_purchases: array<int> ] 

我想要做的是加入這兩個dataframes,但我只需要在aDF.user_purchasesbDF.user_purchases之間的交點具有多於2個元素(交集> 2)的線。

我必須使用RDD API還是可以使用org.apache.sql.functions中的某個函數?

回答

1

我看不出有任何功能內置,但您可以使用UDF:

import scala.collection.mutable.WrappedArray; 
val intersect = udf ((a : WrappedArray[Int], b : WrappedArray[Int]) => { 
    var count = 0; 
    a.foreach (x => { 
     if (b.contains(x)) count = count + 1; 
    }); 
    count; 
}); 
// test data sets 
val one = sc.parallelize(List(
     (1, Array(1, 2, 3)), 
     (2, Array(1,2 ,3, 4)), 
     (3, Array(1, 2,3)), 
     (4, Array(1,2)) 
     )).toDF("user", "arr"); 

val two = sc.parallelize(List(
     (1, Array(1, 2, 3)), 
     (2, Array(1,2 ,3, 4)), 
     (3, Array(1, 2, 3)), 
     (4, Array(1)) 
     )).toDF("user", "arr"); 

// usage 
one.join(two, one("user") === two("user")) 
    .select (one("user"), intersect(one("arr"), two("arr")).as("intersect")) 
    .where(col("intersect") > 2).show 

// version from comment 
one.join(two) 
    .select (one("user"), two("user"), intersect(one("arr"), two("arr")).as("intersect")). 
    where('intersect > 2).show 
+0

你的udf解決方案似乎解決了我的問題。只有一點注意,我不想加入相同的用戶標識,我想要不同的用戶ID至少有三個常用元素在數組中。 – Vektor88

+0

@ Vektor88所以交叉加入後+過濾器。將很慢,但沒有其他選擇 –

+0

@ Vektor88我已經更新了答案 –

1

一個可能的解決辦法是找到有趣的對與陣列增強這些。首先,讓我們導入一些功能:

import org.apache.spark.sql.functions.explode 

,並重新命名列:

val aDF_ = aDF.toDF("a_user_id", "a_user_purchases") 
val bDF_ = bDF.toDF("b_user_id", "b_user_purchases") 

對匹配的謂詞可確定爲:

val filtered = aDF_.withColumn("purchase", explode($"a_user_purchases")) 
    .join(bDF_.withColumn("purchase", explode($"b_user_purchases")), Seq("purchase")) 
    .groupBy("a_user_id", "b_user_id") 
    .count() 
    .where($"count" > 2) 

最後過濾後的數據可以與輸入數據集加盟獲得全部結果:

filtered.join(aDF_, Seq("a_user_id")).join(bDF_, Seq("b_user_id")).drop("count") 
+0

聰明的一個:)但是我認爲它可能會非常慢,因爲物理計劃比較大,包含很少的聯接和排序 –