2016-11-30 20 views
1

我有以下形式的數據幀:如何在星火應用自定義過濾功能,數據幀

A_DF = |id_A: Int|concatCSV: String| 

和另一個問題:

B_DF = |id_B: Int|triplet: List[String]| 

concatCSV例子可能看起來像:

"StringD, StringB, StringF, StringE, StringZ" 
"StringA, StringB, StringX, StringY, StringZ" 
... 

triplet是這樣的:

("StringA", "StringF", "StringZ") 
("StringB", "StringU", "StringR") 
... 

我想產生笛卡爾一套A_DFB_DF,例如;

| id_A: Int | concatCSV: String        | id_B: Int | triplet: List[String]   | 
|  14 | "StringD, StringB, StringF, StringE, StringZ" |  21 | ("StringA", "StringF", "StringZ")| 
|  14 | "StringD, StringB, StringF, StringE, StringZ" |  45 | ("StringB", "StringU", "StringR")| 
|  18 | "StringA, StringB, StringX, StringY, StringG" |  21 | ("StringA", "StringF", "StringZ")| 
|  18 | "StringA, StringB, StringX, StringY, StringG" |  45 | ("StringB", "StringU", "StringR")| 
| ... |            |   |         | 

然後保持這一點至少有兩個(例如StringA, StringB)從A_DF("concatCSV")出現在B_DF("triplet"),即使用filter排除那些不符合這個條件記錄。

第一個問題是:我可以做到這一點,而無需將DF轉換成RDD?

第二個問題是:我可以做理想整個事情的join一步 - 爲where條件?

我曾嘗試用類似的實驗:

val cartesianRDD = A_DF 
    .join(B_DF,"right") 
    .where($"triplet".exists($"concatCSV".contains(_))) 

where不能得到解決。我用filter而不是where嘗試過,但仍然沒有運氣。此外,由於某種奇怪的原因,鍵入cartesianRDD的註釋是SchemaRDD而不是DataFrame。我是怎麼結束的?最後,我上面所嘗試的(我寫的短代碼)是不完整的,因爲它將保留僅在中發現的一個子字符串的記錄,其在triplet中找到。

因此,第三個問題是:我應該改用RDDs並用自定義過濾函數解決它嗎?

最後,最後一個問題:我可以對DataFrames使用自定義過濾功能嗎?

感謝您的幫助。

+0

不應該的類型'triplet'是'List [(String,String,String)]'? –

+0

你還使用什麼版本的Spark? –

+0

謝謝,修正它在示例 - 糟糕的措辭。我使用Spark 1.5.2 –

回答

2

功能CROSS JOINHive實現的,所以你可以先做交叉聯接使用Hive SQL

A_DF.registerTempTable("a") 
B_DF.registerTempTable("b") 

// sqlContext should be really a HiveContext 
val result = sqlContext.sql("SELECT * FROM a CROSS JOIN b") 

然後你可以過濾到使用兩個udf你期望的輸出。一個你的字符串轉換爲字的陣列,並且第二個,讓我們相交所得陣列列的長度和現有柱"triplet"

import scala.collection.mutable.WrappedArray 
import org.apache.spark.sql.functions.col 

val splitArr = udf { (s: String) => s.split(",").map(_.trim) } 
val commonLen = udf { (a: WrappedArray[String], 
         b: WrappedArray[String]) => a.intersect(b).length } 

val temp = (result.withColumn("concatArr", 
    splitArr(col("concatCSV"))).select(col("*"), 
    commonLen(col("triplet"), col("concatArr")).alias("comm")) 
    .filter(col("comm") >= 2) 
    .drop("comm") 
    .drop("concatArr")) 

temp.show 
+----+--------------------+----+--------------------+ 
|id_A|   concatCSV|id_B|    triplet| 
+----+--------------------+----+--------------------+ 
| 14|StringD, StringB,...| 21|[StringA, StringF...| 
| 18|StringA, StringB,...| 21|[StringA, StringF...| 
+----+--------------------+----+--------------------+ 
+1

完美的答案。謝謝! –