2015-08-25 37 views
0

我有以下代碼來檢測事件中使用最多的頂級域。 我用它通過Spark SQL獲取日期。Apache Spark:火花外殼中的「SparkException:任務不可序列化」RDD手動構建

功能本身經過測試,工作正常。我使用Amazon EMR和spark-shell。當spark向任何節點發送任務時,幾乎立即收到一個很長的堆棧跟蹤和「SparkException:Task not serializable」,但沒有任何特定的內容。這裏的交易是什麼?

import scala.io.Source 
val suffixesStr = 
    Source.fromURL("https://publicsuffix.org/list/public_suffix_list.dat").mkString 
val suffList = 
    suffixesStr.lines.filter(line => !line.startsWith("//") && line.trim() != "") 
val suffListRDD = sc.parallelize(suffList.toList).collect() 

val cleanDomain = (domain: String) => { 
    var secLevelSuffix = 
    suffListRDD.find(suffix => domain.endsWith("."+suffix) && suffix.contains(".")) 
    var regex = """[^.]+\.[^.]+$""".r 
    if (!secLevelSuffix.isEmpty){ 
    regex = """[^.]+\.[^.]+\.[^.]+$""".r 
    } 
    var cleanDomain = regex.findFirstMatchIn(domain).map(_ group 0) 
    cleanDomain.getOrElse("") 
} 

val getDomain = (url: String) => { 
    val domain = """(?i)^(?:(?:https?):\/\/)?(?:(?:www|www1|www2|www3)\.)?([^:?#/\s]+)""".r.findFirstMatchIn(url).map(_ group 1) 
    var res = domain.getOrElse("") 
    res = res.toLowerCase() 
    if (res.contains("google.com")){ 
    res = res.replace("google.com.br", "google.com") 
    }else{ 
    res = cleanDomain(res) 
    } 
    res 
} 

sqlContext.udf.register("getDomain", getDomain) 
val domains = sqlContext.sql("SELECT count(*) c, domain from (SELECT getDomain(page_url) as domain FROM events) t group by domain order by c desc") 
domains.take(20).foreach(println) 

回答

0

當您編程定義RDD像在這種情況下,不要忘了慶祝的事情,也不會被複制到工作節點爲@Transient

在你的情況:

@transient val suffixesStr = ... 
@transient val suffList = ...