星火版本1.2.1 斯卡拉版本2.10.4實現一個歸併類似功能的火花與斯卡拉
我有2個,它們通過數字字段相關SchemaRDD:
RDD 1: (Big table - about a million records)
[A,3]
[B,4]
[C,5]
[D,7]
[E,8]
RDD 2: (Small table < 100 records so using it as a Broadcast Variable)
[SUM, 2]
[WIN, 6]
[MOM, 7]
[DOM, 9]
[POM, 10]
Result
[C,5, WIN]
[D,7, MOM]
[E,8, DOM]
[E,8, POM]
我想最大(場)來自RDD1,即< =來自RDD2的場。
我試圖接近這個使用合併:
排序由鍵(排序中的一組RDD將在該組中不超過100條記錄在上面的例子是組內)
執行類似mergesort的合併操作。在這裏,我需要跟蹤以前的值,以找到最大值;我仍然只遍歷一次。
由於這裏可能有太多的變量,我得到「任務不可序列化」異常。這個實現方法是否正確?我在這裏試圖避免笛卡爾積。有沒有更好的方法來做到這一點?
添加代碼 -
rdd1.groupBy(itm => (itm(2), itm(3))).mapValues(itmorg => {
val miorec = itmorg.toList.sortBy(_(1).toString)
for(r <- 0 to miorec.length) {
for (q <- 0 to rdd2.value.length) {
if ((miorec(r)(1).toString > rdd2.value(q).toString && miorec(r-1)(1).toString <= rdd2.value(q).toString && r > 0) || r == miorec.length)
org.apache.spark.sql.Row(miorec(r-1)(0),miorec(r-1)(1),miorec(r-1)(2),miorec(r-1)(3),rdd2.value(q))
}
}
}).collect.foreach(println)
'因爲有太多變量五月這裏我得到了「任務不可序列化」異常。「看起來可能是因爲其他原因造成太多變量的異常。你能發佈代碼嗎? –