2017-05-15 196 views
0

我有大小6000的查找RDD內RDD訪問,lookup_rdd:RDD [字符串]火花:另一個RDD

a1 a2 a3 a4 a5 .....

和另一個RDD,data_rdd:RDD [(字符串,可迭代[(字符串, INT)])]:(ID,(項目,計數)),它具有唯一的ID,在lookup_rdd

(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))

FOREACH元素我要檢查每個ID是否有該元素與否,如果它的存在我把計數,如果不是我把0,和存儲在一個文件中。

什麼是實現這一目標的有效方法。哈希可能嗎?例如。輸出我想要的是:

id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1

我已經試過這樣:

val headers = lookup_rdd.zipWithIndex().persist() 
val indexing = data_rdd.map{line => 
    val id = line._1 
    val item_cnt_list = line._2 
    val arr = Array.fill[Byte](6000)(0) 
    item_cnt_list.map(c=>(headers.lookup(c._1),c._2)) 
    } 
indexing.collect().foreach(println) 

我得到異常:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations

+0

6000 entires是一個非常小的數據集。考慮收集驅動程序,然後廣播 –

回答

1

壞消息是,你不能使用RDD在另一個。

好消息是,對於您的使用情況,假設6000個條目相當小,則存在一個理想的解決方案:在驅動程序上收集RDD,將其廣播回集羣的每個節點,並在其他RDD,就像你之前做過的那樣。

val sc: SparkContext = ??? 
val headers = sc.broadcast(lookup_rdd.zipWithIndex.collect().toMap) 
val indexing = data_rdd.map { case (_, item_cnt_list) => 
    item_cnt_list.map { case (k, v) => (headers.value(k), v) } 
} 
indexing.collect().foreach(println) 
+0

感謝您的答案。有一個類似的情況,但另外..有更新地圖功能裏面的查找表。對於下一個元素,我必須查找更新的查找表。我明白,我們不能用broadcast來做這件事。請你建議如何解決這個問題。即使鏈接到資源也會有所幫助。提前致謝。 – Phoenix

+0

我相信你有一個更好的改變,爲你的特殊情況創建一個問題,分享相關的代碼。沒有它,很難說。 – stefanobaghino

+0

已經添加了一個單獨的問題:請你看看。 :HTTPS://stackoverflow.com/questions/49125735/loop-through-dataframe-and-update-the-lookup-table-simultaneously-spark-scala – Phoenix