火花

2016-02-24 185 views
7

造成內存溢出異常太多的地圖鍵我有形式RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])]這是一個PairRDD(key,value),其中關鍵是Vector[(Int, Byte)]和價值是Vector[(Int, Byte)]的RDD 'inRDD'火花

對於關鍵字段向量中的每個元素(Int, Byte)以及值字段向量中的每個元素(Int, Byte)我想在輸出RDD中獲得一個新的(鍵,值)對作爲(Int, Int), (Byte, Byte)

這應該給我一個形式爲RDD[((Int, Int), (Byte, Byte))]的RDD。

例如,inRDD內容也能像,

(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2))) 

這將成爲

((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2)) 

我有下面的代碼。

val outRDD = inRDD.flatMap {           
    case (left, right) => 
    for ((ll, li) <- left; (rl, ri) <- right) yield { 
     (ll,rl) -> (li,ri) 
    } 
} 

它適用於向量在inRDD中時尺寸較小的情況。但是當向量中有很多元素時,我得到了out of memory exception。增加可用存儲器 的火花只能解決較小的輸入,並且對於更大的輸入又會出現錯誤。 看起來我正試圖在內存中組裝一個巨大的結構。我無法以任何其他方式重寫此代碼。

我已經實施了與java in hadoop類似的邏輯如下。

for (String fromValue : fromAssetVals) { 
    fromEntity = fromValue.split(":")[0]; 
    fromAttr = fromValue.split(":")[1]; 
    for (String toValue : toAssetVals) { 
     toEntity = toValue.split(":")[0]; 
     toAttr = toValue.split(":")[1]; 
     oKey = new Text(fromEntity.trim() + ":" + toEntity.trim()); 
     oValue = new Text(fromAttr + ":" + toAttr); 
     outputCollector.collect(oKey, oValue); 
    } 
} 

但是當我在spark中嘗試類似的東西時,我得到了嵌套的rdd異常。

我該如何有效地使用spark using scala

+0

你有沒有嘗試解決這個增加分區的數量? – BlackBear

+0

@BlackBear是的。但是這並沒有幫助。 – CRM

回答

2

好吧,如果笛卡爾乘積是你至少可以使它更有點懶的唯一選擇:

inRDD.flatMap { case (xs, ys) => 
    xs.toIterator.flatMap(x => ys.toIterator.map(y => (x, y))) 
} 

您還可以在星火水平

import org.apache.spark.RangePartitioner 

val indexed = inRDD.zipWithUniqueId.map(_.swap) 
val partitioner = new RangePartitioner(indexed.partitions.size, indexed) 
val partitioned = indexed.partitionBy(partitioner) 

val lefts = partitioned.flatMapValues(_._1) 
val rights = partitioned.flatMapValues(_._2) 

lefts.join(rights).values