2016-02-02 31 views
0

星火版本1.2.1 斯卡拉版本2.10.4實現一個歸併類似功能的火花與斯卡拉

我有2個,它們通過數字字段相關SchemaRDD:

RDD 1: (Big table - about a million records) 
[A,3] 
[B,4] 
[C,5] 
[D,7] 
[E,8] 

RDD 2: (Small table < 100 records so using it as a Broadcast Variable) 
[SUM, 2] 
[WIN, 6] 
[MOM, 7] 
[DOM, 9] 
[POM, 10] 

Result  
[C,5, WIN] 
[D,7, MOM] 
[E,8, DOM] 
[E,8, POM] 

我想最大(場)來自RDD1,即< =來自RDD2的場。

我試圖接近這個使用合併:

  1. 排序由鍵(排序中的一組RDD將在該組中不超過100條記錄在上面的例子是組內)

  2. 執行類似mergesort的合併操作。在這裏,我需要跟蹤以前的值,以找到最大值;我仍然只遍歷一次。

由於這裏可能有太多的變量,我得到「任務不可序列化」異常。這個實現方法是否正確?我在這裏試圖避免笛卡爾積。有沒有更好的方法來做到這一點?

添加代碼 -

rdd1.groupBy(itm => (itm(2), itm(3))).mapValues(itmorg => { 
    val miorec = itmorg.toList.sortBy(_(1).toString) 
    for(r <- 0 to miorec.length) { 
    for (q <- 0 to rdd2.value.length) { 
     if ((miorec(r)(1).toString > rdd2.value(q).toString && miorec(r-1)(1).toString <= rdd2.value(q).toString && r > 0) || r == miorec.length) 
      org.apache.spark.sql.Row(miorec(r-1)(0),miorec(r-1)(1),miorec(r-1)(2),miorec(r-1)(3),rdd2.value(q)) 
     } 
    } 
    }).collect.foreach(println) 
+2

'因爲有太多變量五月這裏我得到了「任務不可序列化」異常。「看起來可能是因爲其他原因造成太多變量的異常。你能發佈代碼嗎? –

回答

0

我不會做一個全球性的排序。對於您所需要的,這是一項昂貴的操作。找到最大值肯定比獲得所有值的全局排序便宜。相反,請執行以下操作:

  1. 對於每個分區,建立一個結構,以保持RDD2上每行的RDD1的最大值。這可以使用mapPartitions和普通的scala數據結構來完成。你甚至可以在這裏使用你的一遍合併代碼。你應該得到像HashMap(WIN -> (C, 5), MOM -> (D, 7), ...)
  2. 一旦這是在每個執行器本地完成,合併這些結果數據結構應該是簡單的使用reduce

這裏的目標是儘可能做到沒有洗牌,保持最複雜的操作本地化,因爲您要的結果大小非常小(代碼中只需創建RDD1的所有有效鍵/值而RDD2則爲aggregateByKey,但效率較低)。

至於你例外,你woudl需要顯示的代碼,「任務不序列化」,通常意味着你在繞過封鎖哪些不是,嗯,序列化;-)

+0

嗨丹尼爾,我已經添加了代碼,你能給我一個代碼片和示例我怎樣才能解決這個使用分區? –

+0

不幸的是,我現在爲你構建代碼的時間很短,但[很容易找到示例](http://homepage.cs.latrobe.edu.au/zhe/ ZhenHeSparkRDDAPIExamples.html#mapPartitions) 。另外,請注意'groupBy'是一個非常緩慢的操作,因爲它將所有東西洗牌!瞄準本地'reduce'或'aggregateByKey'。構建一個普通的scala「def」,找到你尋找一個小集合的結果,然後在各個分區之間進行聚合。 –