2015-09-25 24 views
0

我試圖模糊地連接兩個數據集,其中一個引號和其中一個銷售。出於參數的原因,加入屬性是名字,姓氏,dob和電子郵件。平面圖內的星圖複製笛卡爾連接

我有26m +的報價和1m +的銷售額。客戶可能沒有使用一個或多個屬性的準確信息,所以我給他們每個匹配的分數(1,1,1,1),其中全部匹配(0,0,0,0),其中沒有比賽。

所以我結束了類似

q1, s1, (0,0,1,0) 
q1, s2, (0,1,0,1) 
q1, s3, (1,1,1,1) 
q2, s1, (1,0,0,1) 
... 
q26000000 s1 (0,1,0,0) 

東西,所以我認爲這是相當於我在我的管理使得大量分區的報價笛卡爾積

val quotesRaw = sc.textfile(....) 
val quotes = quotesRaw.repartition(quotesRaw.count().toInt()/100000) 

val sales = sc.textfile(...) 
val sb = sc.broadcast(sales.collect()) 

quotes.mapPartitions(p=> (
    p.flatMap(q => (
     sb.value.map(s => 
      q._1, s._1, (if q._2 == s._2 1 else 0, etc) 
    ) 
) 

這一切都有效,如果我保持數字低,如26米報價,但只有1000銷售,但如果我運行它將所有銷售它只是停止響應時運行

我'用下面的配置運行它。

spark-submit --conf spark.akka.frameSize=1024 --conf spark.executor.memory=3g --num-executors=30 --driver-memory 6g --class SalesMatch --deploy-mode client --master yarn SalesMatching-0.0.1-SNAPSHOT.jar hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt hdfs://cluster:8020/tmp/_salesdata_matches_new 

有沒有什麼東西跳出來顯然不正確?

+0

這要看情況。從RDD創建廣播是一個昂貴的過程,尤其是當RDD很大時(單個銷售記錄的大小是多少?)。首先,您必須將所有數據傳輸到驅動程序,然後分發給工作人員。這意味着重複的序列化/反序列化,網絡流量和存儲數據的成本。 – zero323

+1

@ zero323一旦我已經預測出我感興趣的屬性,它對於11m的銷售量最終只有40mb。在修剪爲必需的屬性後,引號結束大約3GB。鑑於3節點盒子上的這些尺寸,我不知道爲什麼它如此緩慢。 (即使接受q * s性質的結果 – owen79

+0

因此,廣播不太可能是一個問題,另一個腥味是笛卡爾產品本身,每個分區10k報價和11M銷售,每個分區產生大約4TB(11e11條目)if我正確的計算,我有理由在flatMap中過濾相似性很低的文件, – zero323

回答

1

假設每個分區有10萬個報價,總大小爲40MB的11M個銷售,您的代碼會爲每個分區產生大約4TB的數據,所以您的員工不太可能處理這個問題,而且絕對不能在內存中完成。

我假設你只對近距離匹配感興趣,所以過早過濾是有意義的。簡化您的一些代碼(據我可以告訴有沒有理由使用mapPartitions):

// Check if match is close enough, where T is type of (q._1, s._1, (...)) 
def isCloseMatch(match: T): Boolean = ??? 

quotes.flatMap(q => sb.value 
    .map(s => (q._1, s._1, (....))) // Map as before 
    .filter(isCloseMatch) // yield only close matches 
) 

總論:

  • 從RDD創建廣播是昂貴的過程。首先,您必須將所有數據傳輸到驅動程序,然後分發給工作人員。它意味着重複存儲數據
  • 對於相對簡單的操作,這樣就可以使用高國家級星火SQL API是一個好主意的序列化/反序列化,網絡流量和費用:

    import org.apache.spark.sql.DataFrame 
    
    val salesDF: DataFrame = ??? 
    val salesDF: DataFrame = ??? 
    val featureCols: Seq[String] = ??? 
    val threshold: Int = ??? 
    
    val inds = featureCols // Boolean columns 
        .map(col => (quotesDF(col) === salesDF(col)).alias(s"${col}_ind")) 
    
    val isSimilar = inds // sum(q == s) > threshold 
        .map(c => c.cast("integer").alias(c.toString)) 
        .reduce(_ + _) 
        .geq(threshold) 
    
    val combined = quotesDF 
        .join(salesDF, isSimilar, "left") 
    
+0

非常感謝,我會給它一個提示,在得分之前做一些過濾有什麼好處嗎?是否會減少銷售數量如果已經被過濾了,比上面要檢查的更多嗎?比如'.filter(q.email == s.email).map(q._1,q._2 ......)' – owen79

+0

是的,雖然如果你有一些領域,你總是想用這個領域的Map來代替線性搜索。像'val sb = sc.broadcast(sales.map(s =>(s.email,s))。collectAsMap'。 – zero323