如果modulelookup
和brandlookup
都比較小,你可以將這些播放變量和使用映射如下:
val modulelookupBD = sc.broadcast(modulelookup.collectAsMap)
val brandlookupBD = sc.broadcast(brandlookup.collectAsMap)
def description(list:Array[String]): Array[String] = list.map(x => {
val module = modulelookupBD.value.getOrElse(x.take(4), "")
val brand = brandlookupBD.value.getOrElse(x.drop(4), "")
s"$module $brand"
})
val printRDD = outputRDD.map{case (xs, y) => (description(xs), y)}
如果不是有處理這種沒有效率的方式。您可以嘗試使用flatMap
,join
和groupByKey
,但對於任何大型數據集,此組合可能過於昂貴。
val indexed = outputRDD.zipWithUniqueId
val flattened = indexed.flatMap{case ((xs, _), id) => xs.map(x => (x, id))}
val withModuleAndBrand = flattened
.map(xid => (xid._1.take(4), xid))
.join(modulelookup)
.values
.map{case ((x, id), module) => (x.drop(4), (id, module))}
.join(brandlookup)
.values
.map{case ((id, module), brand) => (id, s"$module $brand")}
.groupByKey
val final = withModuleAndBrand.join(
indexed.map{case ((_, y), id) => (id, y)}
).values
使用DataFrame替換RDD可以減少樣板代碼,但性能仍然存在問題。
這是一個徹底的答案。謝謝。 – user1050325