下面是我用scala編寫的spark程序,用於查找給定單詞的字形。但是從測試用例執行時,程序失敗。Spark程序執行失敗,出現NotSerializableException
class Anagram {
def collectAnagrams(name: String,rdd : RDD[String]): RDD[String] = {
return rdd.flatMap(line => line.split("\\s+")).filter(x=>verifyAnagrams(x,name));
}
def verifyAnagrams(str1 : String, str2 : String): Boolean = {
if(str1.length != str2.length) {
return false;
}
val letters = Array.fill[Int](256)(0);
for(i <- 0 until str1.length) {
letters(str1.charAt(i).toInt)+=1;
letters(str2.charAt(i).toInt)-=1;
}
for(i <-0 until 256) {
if(letters(i) != 0) {
return false;
}
}
return true;
}
}
class AnagramTest extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
before {
val conf = new SparkConf().setMaster("local").setAppName("anagarm of string")
sc = new SparkContext(conf)
}
test("Anagram string check in a file") {
val anagramToken : String = "Tunring"
// @@ SETUP
val Anagram = new Anagram()
// @@ EXERCISE
val anagrams = Anagram.collectAnagrams(anagramToken,sc.textFile(getClass.getClassLoader.getResource("word_count_input.txt").getPath))
// @@ VERIFY
assert(anagrams.collect().toSet.size == 1)
}
}
當上述測試情況下,執行下面的異常發生
任務不可序列org.apache.spark.SparkException:任務不可序列在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)at org.apache.spark.rdd.RDD.filter(RDD.scala:303)at 個Anagram.collectAnagrams(Anagram.scala:10)** 斷言(anagrams.collect()== toSet集( 「Tunring」, 「Tunring」))
我想想確切知道根本原因以及以下內容。
- 從火花上下文中調用的每個類是否需要序列化?
- 是否每個定義的方法都需要序列化爲字節並通過節點發送?
- 被調用的RDD的封裝類是否需要序列化?
- 根據我的理解,轉換函數是通過節點發送的。所以這些方法將被序列化。那寫的新方法呢?
任何幫助表示讚賞。
[Task not serializable:java.io.NotSerializableException在僅在類而非關閉對象上調用函數外部時可能重複](http://stackoverflow.com/questions/22592811/task-not-serializable-java-io- notserializableexception-when-calling-function-ou) – sgvd