我想構造一個接收解析器作爲參數的類,並在每一行使用這個解析器。以下是您可以粘貼到spark-shell
的最簡單示例。通用解析器類`任務不可序列化'
import scala.util.{Success,Failure,Try}
import scala.reflect.ClassTag
class Reader[T : ClassTag](makeParser:() => (String => Try[T])) {
def read(): Seq[T] = {
val rdd = sc.parallelize(Seq("1","2","oops","4")) mapPartitions { lines =>
// Since making a parser can be expensive, we want to make only one per partition.
val parser: String => Try[T] = makeParser()
lines flatMap { line =>
parser(line) match {
case Success(record) => Some(record)
case Failure(_) => None
}
}
}
rdd.collect()
}
}
class IntParser extends (String => Try[Int]) with Serializable {
// There could be an expensive setup operation here...
def apply(s: String): Try[Int] = Try { s.toInt }
}
然而,當我嘗試運行像new Reader(() => new IntParser).read()
(哪種類型的檢查就好了),我得到了可怕的錯誤org.apache.spark.SparkException: Task not serializable
有關關閉。
爲什麼會出現錯誤,並且有沒有辦法重新設計上述以避免這種情況(同時保留Reader
通用)?
奇怪。該函數只關閉'makeParser',但是'()=>新的IntParser'應該是可序列化的。如果用'parser'作爲參數替換傳遞的'makeParser'會發生什麼? –
@AlexeyRomanov如果我只是把'parser'作爲'Reader [T]'的參數,我仍然會得到相同的消息(稍微不同的跟蹤,但仍然與閉包有關) – Alec
@Alec - 快速修復移動行解析器:String =>在val rdd =之前嘗試[T] = makeParser().. – Knight71