2016-06-08 62 views
1

我想構造一個接收解析器作爲參數的類,並在每一行使用這個解析器。以下是您可以粘貼到spark-shell的最簡單示例。通用解析器類`任務不可序列化'

import scala.util.{Success,Failure,Try} 
import scala.reflect.ClassTag 

class Reader[T : ClassTag](makeParser:() => (String => Try[T])) { 

    def read(): Seq[T] = { 

    val rdd = sc.parallelize(Seq("1","2","oops","4")) mapPartitions { lines => 

     // Since making a parser can be expensive, we want to make only one per partition. 
     val parser: String => Try[T] = makeParser() 

     lines flatMap { line => 
     parser(line) match { 
      case Success(record) => Some(record) 
      case Failure(_) => None 
     } 
     } 
    } 

    rdd.collect() 
    } 
} 

class IntParser extends (String => Try[Int]) with Serializable { 
    // There could be an expensive setup operation here... 
    def apply(s: String): Try[Int] = Try { s.toInt } 
} 

然而,當我嘗試運行像new Reader(() => new IntParser).read()(哪種類型的檢查就好了),我得到了可怕的錯誤org.apache.spark.SparkException: Task not serializable有關關閉。

爲什麼會出現錯誤,並且有沒有辦法重新設計上述以避免這種情況(同時保留Reader通用)?

+0

奇怪。該函數只關閉'makeParser',但是'()=>新的IntParser'應該是可序列化的。如果用'parser'作爲參數替換傳遞的'makeParser'會發生什麼? –

+0

@AlexeyRomanov如果我只是把'parser'作爲'Reader [T]'的參數,我仍然會得到相同的消息(稍微不同的跟蹤,但仍然與閉包有關) – Alec

+0

@Alec - 快速修復移動行解析器:String =>在val rdd =之前嘗試[T] = makeParser().. – Knight71

回答

2

問題是,makeParser是可變的class Reader,因爲你在rdd轉換中使用它,spark會嘗試序列化整個不可序列化的類Reader。所以你會得到任務而不是可序列化的異常。

將Serializable添加到類Reader中將與您的代碼一起工作。但是這不是一個好的做法,因爲它會序列化可能不需要的整個類變量。

一般而言,您可以使用函數而不是方法來避免序列化問題。因爲在scala中函數實際上是對象,它將被序列化。

參考這樣的回答: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

+1

備用修正: 1.使'makeParser'成爲'read'而不是'Reader'的參數。 2.更改'read'以將'makeParser'函數存儲在局部變量中:'val makeParser0 = makeParser; ... val parser:String =>嘗試[T] = makeParser0()...'。 –

+0

啊!第一段解釋它 - 現在一切都合情合理。 @AlexeyRomanov我可能會最終做到這一點! – Alec

相關問題