2016-10-24 76 views
-2

我想提高我的星火代碼:優化星火代碼

var lst = disOneRDDM.filter(x=> x._2._1 == 1).keys.collect 
val disTwoRDDM = disOneRDDM.map(x=> { 
            var b:Boolean = false 
            breakable{ 
            for (str <- x._2._2) 
             if (lst.contains(str)) 
              {b = true 
              break} 
            } 
            if (b) 
             (x._1,(Math.min(2,x._2._1),x._2._2)) 
            else 
             x 
            }).cache 

我有形式的RDD的(字符串,(智力,列表[字符串]))。 List[String]中的每個元素在此RDD中都有其自己的條目,可用作關鍵字。樣品輸入如下所示(這是disOneRDDM在我的代碼):

("abc",(10,List("hij","efg","klm"))) 
("efg",(1,List("jhg","Beethan","abc","ert"))) 
("Beethan",(0,List("efg","vcx","zse"))) 
("vcx",(1,List("czx","Beethan","abc"))) 
("zse",(1,List("efg","Beethan","nbh"))) 
("hij",(10,List("vcx","klm","zse"))) 
("jhg",(10,List("ghb","cdz","awq","swq"))) 
... 

我的意圖是在每個List[String]其中具有1的Int值和元件的查找並改變其自己的Intmin(2,current_Int_value) 。例如,在輸入代碼中,條目​​具有包含"efg"的列表,其具有Int值1,並且條目"hij"具有"vcx"。所以,我希望形式的輸出:

("abc",(2,List("hij","efg","klm"))) 
("efg",(1,List("jhg","Beethan","abc","ert"))) 
("Beethan",(0,List("efg","vcx","zse"))) 
("vcx",(1,List("czx","Beethan","abc"))) 
("zse",(1,List("efg","Beethan","nbh"))) 
("hij",(2,List("vcx","klm","zse"))) 
("jhg",(10,List("ghb","cdz","awq","swq"))) 
... 

的RDD的規模是巨大的,我做了它的工作方式,但速度很慢。在上面的代碼中,我試圖過濾具有Int值1的RDD並通過收集它們來形成列表lst。然後,爲了找到值爲2的元素,我遍歷元素的列表條目並檢查列表lst是否包含條目。如果是這樣,我打破循環並分配適當的Int值。
有沒有更快的方法來做到這一點,例如,不必收集列表中的巨大RDD?

+0

LST是來自一個列表RDD,所以我認爲它有點大。你正在做的是對disOneRDDM中的每一行進行迭代。此外,對於映射中的每個單獨分區,該列表都不會被廣播,因此該列表將被序列化並分發。我會將lst轉換爲Set並播放它。 –

回答

2

作爲一個@ - spoty現場評論說,如果沒有太多lst獨特價值觀 - 你最好的辦法是將其更改爲Set(其中刪除重複),並使用廣播。

否則(如果該唯一鍵的列表仍然可能很大) - 這是一個根本不使用collect的解決方案,這意味着它可以處理任何大小。但是,由於它通過使用flatMap增加了RDD的大小,並執行了join(這需要進行洗牌),所以我不確定它會更快,這取決於數據和羣集的具體情況。

// create the lookup "map" (the int values are actually irrelevant, we just need the keys) 
val lookup: RDD[(String, Int)] = disOneRDDM.cache().filter(_._2._1 == 1).map(t => (t._1, 1)) 

val result = disOneRDDM 
    .flatMap { // break up each record into individual records for join 
    case (k, (i, list)) => list.map(s => (s, (k, i))) 
    } 
    .leftOuterJoin(lookup).map { // left join with lookup and change int values if we found a match 
    case (item, ((k, i), Some(_))) => (k, (Math.min(2, i), item)) 
    case (item, ((k, i), _)) => (k, (i, item)) 
    } 
    .groupByKey().map { // group by key to merge back to lists, while mapping to the desired structure 
    case (k, iter) => 
     val l = iter.toList 
     (k, (l.map(_._1).min, l.map(_._2))) 
    } 

result.foreach(println) 
// (Beethan,(0,List(zse, efg, vcx))) 
// (jhg,(10,List(cdz, swq, ghb, awq))) 
// (hij,(2,List(klm, zse, vcx))) 
// (zse,(1,List(Beethan, nbh, efg))) 
// (efg,(1,List(Beethan, jhg, abc, ert))) 
// (vcx,(1,List(Beethan, czx, abc))) 
// (abc,(2,List(klm, hij, efg))) 
+0

使用Dataframe添加了另一個答案,我發現它有點乾淨,但都是有效的。 –

+0

只是一個簡單的問題,如果我使用單個節點進行計算,會不會播出幫助? – sarthak

+0

是的,我認爲是這樣 - 當不使用廣播時,數據將按序列化並按_task_發送,因此即使使用單個節點,您仍可能會支付更高的開銷。 –

1

如果你願意使用Dataframes API,而不是RDDS的 - 這裏有可能簡化代碼位(並提高性能)的另一種選擇:

// UDF to check if string contained in array - will be used for the join 
val arrayContains = udf { (a: mutable.WrappedArray[String], s: String) => a.contains(s) } 

// create Dataframe from RDD and create the filtered lookupDF 
val df = disOneRDDM.map {case (k, (v, l)) => (k, v, l) }.toDF("key", "val", "list").cache() 
val lookupDf = df.filter($"val" === 1).select($"key" as "match") 

// join, groupBy to remove the duplicates while collecting non-null matches, and perform transformation on "val" 
val resultDF = df 
.join(lookupDf, arrayContains($"list", $"match"), "leftouter") 
.groupBy($"key").agg(
    first("val") as "val", 
    first("list") as "list", 
    first("match", ignoreNulls = true) as "match") 
.selectExpr("key", "IF(match IS NULL OR val < 2, val, 2) as val", "list") 

resultDF.show() 
// +-------+---+--------------------+ 
// | key|val|    list| 
// +-------+---+--------------------+ 
// | zse| 1| [efg, Beethan, nbh]| 
// | efg| 1|[jhg, Beethan, ab...| 
// | hij| 2|  [vcx, klm, zse]| 
// |Beethan| 0|  [efg, vcx, zse]| 
// | vcx| 1| [czx, Beethan, abc]| 
// | abc| 2|  [hij, efg, klm]| 
// | jhg| 10|[ghb, cdz, awq, swq]| 
// +-------+---+--------------------+