2016-10-21 32 views
2

我有一個形式爲(String,(Int,Iterable[String]))的RDD。對於RDD中的每個條目,整數值(我稱之爲distance)最初設置爲10。 Iterable[String]中的每個元素在此RDD中都有其自己的條目,它作爲關鍵字(因此我們在單獨的rdd條目中具有Iterable[String]中的每個元素的距離)。我的意圖是執行以下操作:
1.如果列表(Iterable[String])包含元素「Bethan」,我將它的距離指定爲1.
2.之後,我創建了所有鍵的列表距離1通過過濾。
3.After這一點,我改造RDD到一個新的上更新它的距離值2,如果任何在它自己的列表中元素的具有距離1.
我有以下代碼:任務不可串行化錯誤:火花

val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x}) 
    var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect 
    val disTwoRdd = disRdd.map(x=> { 
        var b:Boolean = false 
        loop.breakable{ 
         for (str <- x._2._2) 
         if (lst.contains(str)) //checks if it contains element with distance 1 
         b = true 
         loop.break 
        } 
        if (b) 
         (x._1,(2,x._2._2)) 
        else  
         (x._1,(10,x._2._2)) 
       }) 

但是當我運行它時,我得到錯誤「任務不可序列化」。我該怎麼做,還有更好的方法來做到這一點?

EDIT

輸入形式的RDD:

("abc",(10,List("efg","hij","klm"))) 
("efg",(10,List("jhg","Beethan","abc","ert"))) 
("Beethan",(0,List("efg","vcx","zse"))) 
("vcx",(10,List("czx","Beethan","abc"))) 
("zse",(10,List("efg","Beethan","nbh"))) 
("gvf",(10,List("vcsd","fdgd"))) 
... 

包含Beethan在其列表中的每個元素應當具有距離1,其具有 「一個元件與距離1」 的每一個元素(而不是Beethan)應該有距離2.輸出格式爲:

("abc",(2,List("efg","hij","klm"))) 
("efg",(1,List("jhg","Beethan","abc","ert"))) 
("Beethan",(0,List("efg","vcx","zse"))) 
("vcx",(1,List("czx","Beethan","abc"))) 
("zse",(1,List("efg","Beethan","nbh")) 
("gvf",(10,List("vcsd","fdgd"))) 
... 

錯誤信息:

[error] (run-main-0) org.apache.spark.SparkException: Task not serializable 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) 
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) 
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
at org.apache.spark.rdd.RDD.map(RDD.scala:365) 
at Bacon$.main(Bacon.scala:86) 
at Bacon.main(Bacon.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
Caused by: java.io.NotSerializableException: scala.util.control.Breaks 
Serialization stack: 
- object not serializable (class: scala.util.control.Breaks, value: [email protected]) 
- field (class: Bacon$$anonfun$15, name: loop$1, type: class scala.util.control.Breaks) 
- object (class Bacon$$anonfun$15, <function1>) 
+0

一個小例子(樣本輸入和預期輸出)有助於理解你在這裏試圖實現的目標 – cheseaux

+0

@cheseaux請參閱編輯 – sarthak

+0

@sarthak請添加堆棧跟蹤 - 這非常有用,通常有信息哪些類導致錯誤 –

回答

2
val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x}) 
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect 
val disTwoRdd = disRdd.map(x=> { 
    var b:Boolean = x._._2.filter(y => lst.contains(y)).size() > 0 
    if (b) 
     (x._1,(2,x._2._2)) 
    else  
     (x._1,(10,x._2._2)) 
    }) 

import scala.util.control.Breaks._ 
val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x}) 
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect 
val disTwoRdd = disRdd.map(x=> { 
    var b:Boolean = false 
    breakable{ 
     for (str <- x._2._2) 
     if (lst.contains(str)) //checks if it contains element with distance 1 
      b = true 
      break 
    } 
    if (b) 
     (x._1,(2,x._2._2)) 
    else  
     (x._1,(10,x._2._2)) 
    }) 

兩個版本都對我的作品。問題在於不可序列化的loop.breakable。說實話,我不知道這個建設的行爲是否已經改變,但是在loop.breakable替換爲breakable之後,它的工作原理 - 可能會有一些API更改。帶過濾器的版本可能比較慢,但是避免了與breakable

儘管主要問題的問題,LST應該播出變量 - 但我並沒有把廣播可變這裏提供儘可能簡單的答案,因爲它是可以

+0

非常感謝你......它的工作:) – sarthak

+0

詳細分析火花序列化:http://stackoverflow.com/questions/40818001/understanding-spark-serialization – KrazyGautam