2017-04-25 70 views
1

我有這段代碼:映射一個RDD的值,以他們的字典中的值

List tmp = colRDD.collect(); 
int ctr = 0; 
for(Object o : tmp){ 
    if (!dictionary.containsKey(o)) { 
     dictionary.put(o, ctr++); 
    } 
} 
revDictionary = dictionary.entrySet().stream() 
     .collect(Collectors.toMap(Entry::getValue, c -> c.getKey())); 
colRDD = colRDD.map(x -> {return dictionary.get(x);}); 

一開始,我兌現了RDD,並把每個值在哈希表,其中RDD值是關鍵。 然後,我簡單的想在RDD每個值映射到他們的字典值。 不過,我得到一個Task not serializable錯誤。這是爲什麼 ?

回答

3

這將通過試圖訪問一個變量作用域到驅動器,從內,其由執行程序代碼評估引起。

鑑於你的示例代碼,最有可能的罪魁禍首是在這行代碼dictionary

colRDD = colRDD.map(x -> {return dictionary.get(x);}); 

但是這個問題也可以從另一個在你的代碼比你在這裏提供的到來,所以你可能還需要檢查。

這樣做的原因是因爲dictionary駐留在你的驅動程序,這很可能是在一個單獨的JVM實例比你的遺囑執行人運行內存。您傳遞給colRDD.map的lambda由執行者評估,而非司機。該函數被序列化爲要執行的任務,併發送給執行器以便運行。但是Spark引擎無法連續執行任務,因爲dictionary的「關閉」,因此是例外。

+0

我知道錯誤來自那裏。不過,爲什麼不這樣做。 – SpiderRico

+0

增加了更多的細節來幫助理解發生的事情。 – ImDarrenG

+0

@ImDarrenG答案是正確的,但是如果每次調用地圖時使用廣播都不發送字典將會很好 –