我有大小6000的查找RDD內RDD訪問,lookup_rdd:RDD [字符串]火花:另一個RDD
a1 a2 a3 a4 a5 .....
和另一個RDD,data_rdd:RDD [(字符串,可迭代[(字符串, INT)])]:(ID,(項目,計數)),它具有唯一的ID,在lookup_rdd
(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))
FOREACH元素我要檢查每個ID是否有該元素與否,如果它的存在我把計數,如果不是我把0,和存儲在一個文件中。
什麼是實現這一目標的有效方法。哈希可能嗎?例如。輸出我想要的是:
id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1
我已經試過這樣:
val headers = lookup_rdd.zipWithIndex().persist()
val indexing = data_rdd.map{line =>
val id = line._1
val item_cnt_list = line._2
val arr = Array.fill[Byte](6000)(0)
item_cnt_list.map(c=>(headers.lookup(c._1),c._2))
}
indexing.collect().foreach(println)
我得到異常:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations
6000 entires是一個非常小的數據集。考慮收集驅動程序,然後廣播 –