我試圖模糊地連接兩個數據集,其中一個引號和其中一個銷售。出於參數的原因,加入屬性是名字,姓氏,dob和電子郵件。平面圖內的星圖複製笛卡爾連接
我有26m +的報價和1m +的銷售額。客戶可能沒有使用一個或多個屬性的準確信息,所以我給他們每個匹配的分數(1,1,1,1),其中全部匹配(0,0,0,0),其中沒有比賽。
所以我結束了類似
q1, s1, (0,0,1,0)
q1, s2, (0,1,0,1)
q1, s3, (1,1,1,1)
q2, s1, (1,0,0,1)
...
q26000000 s1 (0,1,0,0)
東西,所以我認爲這是相當於我在我的管理使得大量分區的報價笛卡爾積
val quotesRaw = sc.textfile(....)
val quotes = quotesRaw.repartition(quotesRaw.count().toInt()/100000)
val sales = sc.textfile(...)
val sb = sc.broadcast(sales.collect())
quotes.mapPartitions(p=> (
p.flatMap(q => (
sb.value.map(s =>
q._1, s._1, (if q._2 == s._2 1 else 0, etc)
)
)
這一切都有效,如果我保持數字低,如26米報價,但只有1000銷售,但如果我運行它將所有銷售它只是停止響應時運行
我'用下面的配置運行它。
spark-submit --conf spark.akka.frameSize=1024 --conf spark.executor.memory=3g --num-executors=30 --driver-memory 6g --class SalesMatch --deploy-mode client --master yarn SalesMatching-0.0.1-SNAPSHOT.jar hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt hdfs://cluster:8020/tmp/_salesdata_matches_new
有沒有什麼東西跳出來顯然不正確?
這要看情況。從RDD創建廣播是一個昂貴的過程,尤其是當RDD很大時(單個銷售記錄的大小是多少?)。首先,您必須將所有數據傳輸到驅動程序,然後分發給工作人員。這意味着重複的序列化/反序列化,網絡流量和存儲數據的成本。 – zero323
@ zero323一旦我已經預測出我感興趣的屬性,它對於11m的銷售量最終只有40mb。在修剪爲必需的屬性後,引號結束大約3GB。鑑於3節點盒子上的這些尺寸,我不知道爲什麼它如此緩慢。 (即使接受q * s性質的結果 – owen79
因此,廣播不太可能是一個問題,另一個腥味是笛卡爾產品本身,每個分區10k報價和11M銷售,每個分區產生大約4TB(11e11條目)if我正確的計算,我有理由在flatMap中過濾相似性很低的文件, – zero323