2016-03-10 35 views
1

下面的類包含它試圖從Elasticsearch讀取並打印main函數返回的文件:解耦非序列化的對象,以避免在星火序列化錯誤

object TopicApp extends Serializable { 

    def run() { 

    val start = System.currentTimeMillis() 

    val sparkConf = new Configuration() 
    sparkConf.set("spark.executor.memory","1g") 
    sparkConf.set("spark.kryoserializer.buffer","256") 

    val es = new EsContext(sparkConf) 
    val esConf = new Configuration() 
    esConf.set("es.nodes","localhost") 
    esConf.set("es.port","9200") 
    esConf.set("es.resource", "temp_index/some_doc") 
    esConf.set("es.query", "?q=*:*") 
    esConf.set("es.fields", "_score,_id") 

    val documents = es.documents(esConf) 
    documents.foreach(println) 

    val end = System.currentTimeMillis() 
    println("Total time: " + (end-start) + " ms") 

    es.shutdown() 

    } 

    def main(args: Array[String]) { 
    run() 
    } 

} 

繼類返回的文檔轉換JSON使用org.json4s

class EsContext(sparkConf:HadoopConfig) extends SparkBase { 
    private val sc = createSCLocal("ElasticContext", sparkConf) 

    def documentsAsJson(esConf:HadoopConfig):RDD[String] = { 
    implicit val formats = DefaultFormats 
    val source = sc.newAPIHadoopRDD(
     esConf, 
     classOf[EsInputFormat[Text, MapWritable]], 
     classOf[Text], 
     classOf[MapWritable] 
    ) 
    val docs = source.map(
     hit => { 
     val doc = Map("ident" -> hit._1.toString) ++ mwToMap(hit._2) 
     write(doc) 
     } 
    ) 
    docs 
    } 

    def shutdown() = sc.stop() 

    // mwToMap() converts MapWritable to Map 

} 

繼類創建應用程序的本地SparkContext

trait SparkBase extends Serializable { 
    protected def createSCLocal(name:String, config:HadoopConfig):SparkContext = { 
    val iterator = config.iterator() 
    for (prop <- iterator) { 
     val k = prop.getKey 
     val v = prop.getValue 
     if (k.startsWith("spark.")) 
     System.setProperty(k, v) 
    } 
    val runtime = Runtime.getRuntime 
    runtime.gc() 

    val conf = new SparkConf() 
    conf.setMaster("local[2]") 

    conf.setAppName(name) 
    conf.set("spark.serializer", classOf[KryoSerializer].getName) 

    conf.set("spark.ui.port", "0") 

    new SparkContext(conf) 
    } 
} 

當我運行TopicApp我得到以下錯誤:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:323) 
    at TopicApp.EsContext.documents(EsContext.scala:51) 
    at TopicApp.TopicApp$.run(TopicApp.scala:28) 
    at TopicApp.TopicApp$.main(TopicApp.scala:39) 
    at TopicApp.TopicApp.main(TopicApp.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) 
    - field (class: TopicApp.EsContext, name: sc, type: class org.apache.spark.SparkContext) 
    - object (class TopicApp.EsContext, [email protected]) 
    - field (class: TopicApp.EsContext$$anonfun$documents$1, name: $outer, type: class TopicApp.EsContext) 
    - object (class TopicApp.EsContext$$anonfun$documents$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
    ... 13 more 

通過涵蓋類似問題的其他職位去那裏大多建議做的類Serializable或嘗試將非序列化對象從類分開。

從我得到的錯誤我推斷,SparkContextsc是不可序列化的,因爲SparkContext不是一個可序列化的類。

我應該如何解耦SparkContext,以便應用程序正確運行?

回答

1

我不能確定你的程序運行正常,但是一般規則並不是創建匿名函數,如果它們必須在RDD的數據上執行,那麼這些函數指向不可序列化類的成員。你的情況:

  • EsContextSparkContext類型的纈氨酸,這是(故意)不能序列
  • EsContext.documentsAsJson傳遞給RDD.map匿名函數,調用這個EsContext實例的另一個功能(mwToMap),這力火花序列化該實例,隨着SparkContext其持有

一種可能的解決方案將來自EsContext類(PO被去除mwToMap因爲它們是靜態的,所以對象不需要是可序列化的)的伴隨對象。如果還有其他性質相同的方法(write?),則它們也必須移動。這看起來是這樣的:

import EsContext._ 

class EsContext(sparkConf:HadoopConfig) extends SparkBase { 
    private val sc = createSCLocal("ElasticContext", sparkConf) 

    def documentsAsJson(esConf: HadoopConfig): RDD[String] = { /* unchanged */ } 
    def documents(esConf: HadoopConfig): RDD[EsDocument] = { /* unchanged */ } 
    def shutdown() = sc.stop() 
} 

object EsContext { 
    private def mwToMap(mw: MapWritable): Map[String, String] = { ... } 
} 

如果移動這些方法了(如果他們需要一些EsContext的成員IE)是不可能的 - 那麼考慮這種情況下分開執行實際映射的類(似乎是SparkContext的一種包裝 - 如果是這樣的話,那應該是全部)。

+0

我添加了完整的文件[這裏](https://github.com/apanimesh061/elastic-spark)。你可以看看,如果你想 –

+0

我複製了你的代碼 - 仍然無法編譯,我想有一些版本差異...無論如何,我做了我能做的,希望它有幫助,接受它或離開它: ) –

+0

'write'是'org.json4s.native'中的函數。那麼它會有什麼不同? –