如果我理解正確的話,你的問題可以歸結爲:
val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
而且要合併這些RDDS來獲取所有值(index, (x, y), v)
,在這種情況下,(123, (0,0), 5.5)
和(123, (1,0), 7.7)
?
你絕對可以做到這一點使用join
,因爲兩者RDDS有一個共同的列(x, y)
,但由於其中一人居然有一個Array[(x, y)]
你必須爆炸是爲一組行的第一:
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
謝謝。它在flatMap語句中使用'_'發出一個錯誤:'擴展函數類型的缺少參數...' – EdgeRover
好的。它有一點修改:val explodedIndices = qual.flatMap {case(index,coords:Array [(Long,Long)])=> coords.map {case(x,y)=>(index,(x, Y))}}'。謝謝。 – EdgeRover
太棒了!修正了答案,我實際上並沒有試圖運行它。 – spiffman