我想模擬容錯行爲。我寫了「硬」功能,不時失敗。例如:爲什麼Apache Spark不能重新提交失敗的任務?
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if (raise)
throw new RuntimeException("oh ;(")
"Ok"
}
由於星火具有容錯能力,我預計,失敗的任務會自動重新執行,但不會在未來的代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object Example {
def main(args:Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("shuffle example")
.setMaster("local[*]")
.set("spark.task.maxFailures", "4") // it is default value
val sc = new SparkContext(conf)
val l:RDD[String] = sc.parallelize(List("a", "b", "c"), 3)
def myMap(v: String) = {
// print task info and return "Ok" or throw exception
val context = TaskContext.get()
val r = scala.util.Random
val raise = r.nextBoolean()
println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise")
if (raise)
throw new Exception("oh ;(")
"Ok"
}
println (l.map(myMap).collect().mkString("\n")) // failed
sc.stop()
}
}
發生我在做什麼錯誤?
是的,任務隨機執行或失敗。我預料,那火花會重新提交(重新執行)失敗的任務。 –
由於內部故障,Spark不會因爲用戶拋出異常而重新提交您的任務。 –
好吧,我試着拋出SparkException和RuntimeException,但結果是一樣的。其實我想模擬容錯行爲。我如何做到這一點? –