我有以下形式的數據幀:如何在星火應用自定義過濾功能,數據幀
A_DF = |id_A: Int|concatCSV: String|
和另一個問題:
B_DF = |id_B: Int|triplet: List[String]|
的concatCSV
例子可能看起來像:
"StringD, StringB, StringF, StringE, StringZ"
"StringA, StringB, StringX, StringY, StringZ"
...
而triplet
是這樣的:
("StringA", "StringF", "StringZ")
("StringB", "StringU", "StringR")
...
我想產生笛卡爾一套A_DF
和B_DF
,例如;
| id_A: Int | concatCSV: String | id_B: Int | triplet: List[String] |
| 14 | "StringD, StringB, StringF, StringE, StringZ" | 21 | ("StringA", "StringF", "StringZ")|
| 14 | "StringD, StringB, StringF, StringE, StringZ" | 45 | ("StringB", "StringU", "StringR")|
| 18 | "StringA, StringB, StringX, StringY, StringG" | 21 | ("StringA", "StringF", "StringZ")|
| 18 | "StringA, StringB, StringX, StringY, StringG" | 45 | ("StringB", "StringU", "StringR")|
| ... | | | |
然後保持這一點至少有兩個子(例如StringA, StringB
)從A_DF("concatCSV")
出現在B_DF("triplet")
,即使用filter
排除那些不符合這個條件的記錄。
第一個問題是:我可以做到這一點,而無需將DF轉換成RDD?
第二個問題是:我可以做理想整個事情的join
一步 - 爲where
條件?
我曾嘗試用類似的實驗:
val cartesianRDD = A_DF
.join(B_DF,"right")
.where($"triplet".exists($"concatCSV".contains(_)))
但where
不能得到解決。我用filter
而不是where
嘗試過,但仍然沒有運氣。此外,由於某種奇怪的原因,鍵入cartesianRDD
的註釋是SchemaRDD
而不是DataFrame
。我是怎麼結束的?最後,我上面所嘗試的(我寫的短代碼)是不完整的,因爲它將保留僅在中發現的一個子字符串的記錄,其在triplet
中找到。
因此,第三個問題是:我應該改用RDDs並用自定義過濾函數解決它嗎?
最後,最後一個問題:我可以對DataFrames使用自定義過濾功能嗎?
感謝您的幫助。
不應該的類型'triplet'是'List [(String,String,String)]'? –
你還使用什麼版本的Spark? –
謝謝,修正它在示例 - 糟糕的措辭。我使用Spark 1.5.2 –