2015-05-28 60 views
0

我正在讀取文本文件並在每次迭代中創建Json對象JsValues。我想在每次迭代時將它們保存到文件中。我正在使用Play Framework來創建JSON對象。將JSON寫入文件時發生序列化錯誤

class Cleaner { 
    def getDocumentData() = { 
    for (i <- no_of_files) { 
    .... do something ... 
     some_json = Json.obj("text" -> LARGE_TEXT) 
     final_json = Json.stringify(some_json) 
     //save final_json here to a file 
    } 
    } 
} 

我試着用PrintWriter以保存JSON但我正在逐漸Exception in thread "main" org.apache.spark.SparkException: Task not serializable的錯誤。

我該如何糾正?或者有沒有其他方法可以保存JsValue?

UPDATE:

,我讀了性狀serializable在這種情況下使用。我有以下功能:

class Cleaner() extends Serializable { 
    def readDocumentData() { 
    val conf = new SparkConf() 
     .setAppName("linkin_spark") 
     .setMaster("local[2]") 
     .set("spark.executor.memory", "1g") 
     .set("spark.rdd.compress", "true") 
     .set("spark.storage.memoryFraction", "1") 

    val sc = new SparkContext(conf) 

    val temp = sc.wholeTextFiles("text_doc.dat) 
    val docStartRegex = """<DOC>""".r 
    val docEndRegex = """</DOC>""".r 
    val docTextStartRegex = """<TEXT>""".r 
    val docTextEndRegex = """</TEXT>""".r 
    val docnoRegex = """<DOCNO>(.*?)</DOCNO>""".r 
    val writer = new PrintWriter(new File("test.json")) 

    for (fileData <- temp) { 
     val filename = fileData._1 
     val content: String = fileData._2 
     println(s"For $filename, the data is:") 
     var startDoc = false // This is for the 
     var endDoc = false // whole file 
     var startText = false // 
     var endText = false // 
     var textChunk = new ListBuffer[String]() 
     var docID: String = "" 
     var es_json: JsValue = Json.obj() 

     for (current_line <- content.lines) { 
     current_line match { 
      case docStartRegex(_*) => { 
      startDoc = true 
      endText = false 
      endDoc = false 
      } 
      case docnoRegex(group) => { 
      docID = group.trim 
      } 
      case docTextStartRegex(_*) => { 
      startText = true 
      } 
      case docTextEndRegex(_*) => { 
      endText = true 
      startText = false 
      } 
      case docEndRegex(_*) => { 
      endDoc = true 
      startDoc = false 
      es_json = Json.obj(
       "_id" -> docID, 
       "_source" -> Json.obj(
       "text" -> textChunk.mkString(" ") 
      ) 
      ) 
      writer.write(es_json.toString()) 
      println(es_json.toString()) 
      textChunk.clear() 
      } 
      case _ => { 
      if (startDoc && !endDoc && startText) { 
       textChunk += current_line.trim 
      } 
      } 
     } 
     } 
    } 
    writer.close() 
    } 
} 

這是功能,而我加入性狀,但仍然我得到相同的異常。 我重寫它的一個更小的版本:

def foo() { 
    val conf = new SparkConf() 
     .setAppName("linkin_spark") 
     .setMaster("local[2]") 
     .set("spark.executor.memory", "1g") 
     .set("spark.rdd.compress", "true") 
     .set("spark.storage.memoryFraction", "1") 
    val sc = new SparkContext(conf) 

    var es_json: JsValue = Json.obj() 
    val writer = new PrintWriter(new File("test.json")) 
    for (i <- 1 to 10) { 
     es_json = Json.obj(
     "_id" -> i, 
     "_source" -> Json.obj(
      "text" -> "Eureka!" 
     ) 
    ) 
     println(es_json) 
     writer.write(es_json.toString() + "\n") 
    } 
    writer.close() 
    } 

此功能工作正常,也沒有serializable。我無法理解發生了什麼?

回答

1

編輯:在手機上的第一個答案。

這並不是說必須是序列化的主類,但你在RDD處理循環在內部for (fileData <- temp) 這種情況下,它必須是可序列化,因爲火花數據在多個分區,它可能是在多臺計算機上使用的類。因此,您應用於此數據的功能需要可序列化,以便您可以將它們發送到其他計算機並行執行。 PrintWriter不能被序列化,因爲它指的是隻能從原始計算機上獲得的文件。因此,序列化錯誤。

在初始化火花過程的計算機上寫入數據。你需要把整個集羣中的數據拿到你的機器上然後寫下來。

要做到這一點,您可以收集結果。 rdd.collect(),並將從集羣中獲取所有數據並將其放入驅動程序線程內存中。然後您可以使用PrintWriter將其寫入文件。

這樣的:

temp.flatMap { fileData => 
    val filename = fileData._1 
    val content: String = fileData._2 
    println(s"For $filename, the data is:") 
    var startDoc = false // This is for the 
    var endDoc = false // whole file 
    var startText = false // 
    var endText = false // 
    var textChunk = new ListBuffer[String]() 
    var docID: String = "" 
    var es_json: JsValue = Json.obj() 

    var results = ArrayBuffer[String]() 

    for (current_line <- content.lines) { 
    current_line match { 
     case docStartRegex(_*) => { 
     startDoc = true 
     endText = false 
     endDoc = false 
     } 
     case docnoRegex(group) => { 
     docID = group.trim 
     } 
     case docTextStartRegex(_*) => { 
     startText = true 
     } 
     case docTextEndRegex(_*) => { 
     endText = true 
     startText = false 
     } 
     case docEndRegex(_*) => { 
     endDoc = true 
     startDoc = false 
     es_json = Json.obj(
      "_id" -> docID, 
      "_source" -> Json.obj(
      "text" -> textChunk.mkString(" ") 
     ) 
     ) 
     results.append(es_json.toString()) 
     println(es_json.toString()) 
     textChunk.clear() 
     } 
     case _ => { 
     if (startDoc && !endDoc && startText) { 
      textChunk += current_line.trim 
     } 
     } 
    } 
    } 

    results 
} 
.collect() 
.foreach(es_json => writer.write(es_json)) 

如果結果是你可以使用saveAsTextFile功能,將數據流的每個分區硬盤驅動器線程內存過大。在第二種情況下,作爲參數給出的路徑將被製作爲一個文件夾,並且您的rdd的每個分區都將被寫入其中的編號文件。

這樣的:

temp.flatMap { fileData => 
    val filename = fileData._1 
    val content: String = fileData._2 
    println(s"For $filename, the data is:") 
    var startDoc = false // This is for the 
    var endDoc = false // whole file 
    var startText = false // 
    var endText = false // 
    var textChunk = new ListBuffer[String]() 
    var docID: String = "" 
    var es_json: JsValue = Json.obj() 

    var results = ArrayBuffer[String]() 

    for (current_line <- content.lines) { 
    current_line match { 
     case docStartRegex(_*) => { 
     startDoc = true 
     endText = false 
     endDoc = false 
     } 
     case docnoRegex(group) => { 
     docID = group.trim 
     } 
     case docTextStartRegex(_*) => { 
     startText = true 
     } 
     case docTextEndRegex(_*) => { 
     endText = true 
     startText = false 
     } 
     case docEndRegex(_*) => { 
     endDoc = true 
     startDoc = false 
     es_json = Json.obj(
      "_id" -> docID, 
      "_source" -> Json.obj(
      "text" -> textChunk.mkString(" ") 
     ) 
     ) 
     results.append(es_json.toString()) 
     println(es_json.toString()) 
     textChunk.clear() 
     } 
     case _ => { 
     if (startDoc && !endDoc && startText) { 
      textChunk += current_line.trim 
     } 
     } 
    } 
    } 

    results 
} 
.saveAsTextFile("test.json") 
+0

我不能得到你。我正在嘗試處理'es_json'的類是'serializable'。 –

+0

是的但'PrinterWriter'不可序列化,並且不能被序列化。所以這個錯誤來自'for(data < - temp)'循環中的'writer' val。 –