2016-03-09 54 views
0

我花了一段時間才弄明白這一點,並且想分享我的解決方案。改進絕對受歡迎。拼合RDD - 鍵值對中的嵌套列表

參考文獻:Flattening a Scala Map in an RDDSpark Flatten Seq by reversing groupby, (i.e. repeat header for each sequence in it)

我有以下形式的RDD:RDD [(中間體,列表[(字符串,列表[(字符串,整型,浮點型)])])]

密鑰的:int

值:列表[(字符串,列表[(字符串,整型,浮點型)])]

隨着壓扁到的一個目標:RDD [(中等,字符串,字符串,整型,浮點型)]

binHostCountByDate.foreach(println) 

給人的例子:

(516361, List((2013-07-15, List((s2.rf.ru,1,0.5), (s1.rf.ru,1,0.5))), (2013-08-15, List((p.secure.com,1,1.0))))) 

最終RDD應給予以下

(516361,2013-07-15,s2.rf.ru,1,0.5) 
(516361,2013-07-15,s1.rf.ru,1,0.5) 
(516361,2013-08-15,p.secure.com,1,1.0) 

回答

1

這是一個簡單的一行(並與解構的換理解,我們可以比_1,_2._1等,這使得它更容易,以確保我們得到正確的結果

// Use a outer list in place of an RDD for test purposes 
val t = List((516361, 
       List(("2013-07-15", List(("s2.rf.ru,",1,0.5), ("s1.rf.ru",1,0.5))), 
         ("2013-08-15", List(("p.secure.com,",1,1.0)))))) 

t flatMap {case (k, xs) => for ((d, ys) <- xs; (dom, a,b) <-ys) yield (k, d, dom, a, b)} 
    //> res0: List[(Int, String, String, Int, Double)] = 
     List((516361,2013-07-15,s2.rf.ru,,1,0.5), 
      (516361,2013-07-15,s1.rf.ru,1,0.5), 
      (516361,2013-08-15,p.secure.com,,1,1.0)) 
更好的名字
0

我的方法如下:

我平坦化第一密鑰值對。這「刪除」第一個列表。

val binHostCountForDate = binHostCountByDate.flatMapValues(identity) 

給我以下形式的RDD:RDD [(智力,(字符串,列表[(字符串,整數,浮點數)])]

binHostCountForDate.foreach(println) 

(516361,(2013-07-15,List((s2.rf.ru,1,0.5), (s1.rf.ru,1,0.5)))) 
(516361,(2013-08-15,List(p.secure.com,1,1.0)) 

現在,我的前兩個項目映射到。一個元組創建一個新的密鑰和第二元組作爲值然後應用相同的步驟,上述變平關於新的密鑰值對

val binDataRemapKey = binHostCountForDate.map(f =>((f._1, f._2._1), f._2._2)).flatMapValues(identity) 

這給出了扁平RDD:RDD [(中間體,字符串) ,(String,Int,Float)]

如果這個表單沒有問題,那麼我們就完成了,但我們可以更進一步,刪除元組以獲得我們最初尋找的最終表單。

val binData = binDataRemapKey.map(f => (f._1._1, f._1._2, f._2._1, f._2._2, f._2._3)) 

這給我們的最終形式:RDD [(智力,字符串,字符串,整數,浮點數)

我們現在有一個扁平RDD已保存每個列表的父母。