我正在讀取文本文件並在每次迭代中創建Json對象JsValues
。我想在每次迭代時將它們保存到文件中。我正在使用Play Framework來創建JSON對象。將JSON寫入文件時發生序列化錯誤
class Cleaner {
def getDocumentData() = {
for (i <- no_of_files) {
.... do something ...
some_json = Json.obj("text" -> LARGE_TEXT)
final_json = Json.stringify(some_json)
//save final_json here to a file
}
}
}
我試着用PrintWriter
以保存JSON但我正在逐漸Exception in thread "main" org.apache.spark.SparkException: Task not serializable
的錯誤。
我該如何糾正?或者有沒有其他方法可以保存JsValue?
UPDATE:
,我讀了性狀serializable
在這種情況下使用。我有以下功能:
class Cleaner() extends Serializable {
def readDocumentData() {
val conf = new SparkConf()
.setAppName("linkin_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
val sc = new SparkContext(conf)
val temp = sc.wholeTextFiles("text_doc.dat)
val docStartRegex = """<DOC>""".r
val docEndRegex = """</DOC>""".r
val docTextStartRegex = """<TEXT>""".r
val docTextEndRegex = """</TEXT>""".r
val docnoRegex = """<DOCNO>(.*?)</DOCNO>""".r
val writer = new PrintWriter(new File("test.json"))
for (fileData <- temp) {
val filename = fileData._1
val content: String = fileData._2
println(s"For $filename, the data is:")
var startDoc = false // This is for the
var endDoc = false // whole file
var startText = false //
var endText = false //
var textChunk = new ListBuffer[String]()
var docID: String = ""
var es_json: JsValue = Json.obj()
for (current_line <- content.lines) {
current_line match {
case docStartRegex(_*) => {
startDoc = true
endText = false
endDoc = false
}
case docnoRegex(group) => {
docID = group.trim
}
case docTextStartRegex(_*) => {
startText = true
}
case docTextEndRegex(_*) => {
endText = true
startText = false
}
case docEndRegex(_*) => {
endDoc = true
startDoc = false
es_json = Json.obj(
"_id" -> docID,
"_source" -> Json.obj(
"text" -> textChunk.mkString(" ")
)
)
writer.write(es_json.toString())
println(es_json.toString())
textChunk.clear()
}
case _ => {
if (startDoc && !endDoc && startText) {
textChunk += current_line.trim
}
}
}
}
}
writer.close()
}
}
這是功能,而我加入性狀,但仍然我得到相同的異常。 我重寫它的一個更小的版本:
def foo() {
val conf = new SparkConf()
.setAppName("linkin_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
val sc = new SparkContext(conf)
var es_json: JsValue = Json.obj()
val writer = new PrintWriter(new File("test.json"))
for (i <- 1 to 10) {
es_json = Json.obj(
"_id" -> i,
"_source" -> Json.obj(
"text" -> "Eureka!"
)
)
println(es_json)
writer.write(es_json.toString() + "\n")
}
writer.close()
}
此功能工作正常,也沒有serializable
。我無法理解發生了什麼?
我不能得到你。我正在嘗試處理'es_json'的類是'serializable'。 –
是的但'PrinterWriter'不可序列化,並且不能被序列化。所以這個錯誤來自'for(data < - temp)'循環中的'writer' val。 –