2016-09-05 117 views
0

我的班級中有一張不可變的地圖。當我以本地模式運行我的代碼時,沒有問題,我可以訪問地圖中的每個鍵。但是,當我以集羣模式運行我的代碼時,節點會在映射中找不到鍵的錯誤。在集羣模式下使用地圖的火花

我到現在爲止所嘗試的是這些;

- 在集羣上廣播不可變映射。

broadcast = sc.broadcast(my_immutable_map) 

-Parallelize地圖作爲對RDD

my_map_rdd = sc.parallelize(my_immutable_map.toSeq) 

當我檢查日誌,我看到鑰匙未發現異常。 我的錯誤堆棧跟蹤如下:

Driver stacktrace: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 4 times, most recent failure: Lost task 1.3 in stage 15.0 (TID 25, datanode1.big.com): java.util.NoSuchElementException: key not found: 905053199731 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:58) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at havelsan.CDRGenerator$.generate_random_target(CDRGenerator.scala:95) 
    at havelsan.CDRGenerator$$anonfun$main$2$$anonfun$6.apply(CDRGenerator.scala:167) 
    at havelsan.CDRGenerator$$anonfun$main$2$$anonfun$6.apply(CDRGenerator.scala:165) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1197) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

你能解釋一下如何分配火花地圖以及它是如何可能有些節點無法找到在此映射中的某些鍵,好嗎?順便說一句,我的火花版本是1.6.0

我錯過了什麼?

UPDATE

這部分是用於初始化駕駛員地圖。

... 
    var pd = sc.textFile("hdfs://...") 
    my_immutable_map = pd.map(line => line.split(":")).map{ line => (line(0), line(1).split(","))}.collectAsMap 
... 

    broadcast = sc.broadcast(my_immutable_map) 
    my_map_rdd = sc.parallelize(my_immutable_map.toSeq) 

這是我得到錯誤的部分。

def my_func(key:String):String={ 
... 
    my_value = broadcast.value(key) 
... 
} 

my_func在地圖內被調用爲;

my_another_rdd.map{ line => 
val key = line.split(",")(0) 
    my_func(key) 
} 
+0

spark版本?? – banjara

+0

我的火花版本是1.6.0 –

+0

請提供更多代碼。如果地圖相當小,第一種方法是正確的。 –

回答