2017-04-10 58 views
1

我想模擬容錯行爲。我寫了「硬」功能,不時失敗。例如:爲什麼Apache Spark不能重新提交失敗的任務?

def myMap(v: String) = { 
    // print task info and return "Ok" or throw exception 
    val context = TaskContext.get() 
    val r = scala.util.Random 
    val raise = r.nextBoolean() 
    println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise") 
    if (raise) 
    throw new RuntimeException("oh ;(") 
    "Ok" 
} 

由於星火具有容錯能力,我預計,失敗的任務會自動重新執行,但不會在未來的代碼

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkConf, SparkContext, TaskContext} 

object Example { 

    def main(args:Array[String]): Unit = { 
    Logger.getLogger("org").setLevel(Level.WARN) 
    Logger.getLogger("akka").setLevel(Level.WARN) 

    val conf = new SparkConf() 
     .setAppName("shuffle example") 
     .setMaster("local[*]") 
     .set("spark.task.maxFailures", "4") // it is default value 

    val sc = new SparkContext(conf) 


    val l:RDD[String] = sc.parallelize(List("a", "b", "c"), 3) 

    def myMap(v: String) = { 
     // print task info and return "Ok" or throw exception 
     val context = TaskContext.get() 
     val r = scala.util.Random 
     val raise = r.nextBoolean() 
     println(s"--- map $v in partition ${context.partitionId()} in stage ${context.stageId()} raise = $raise") 
     if (raise) 
     throw new Exception("oh ;(") 
     "Ok" 
    } 

    println (l.map(myMap).collect().mkString("\n")) // failed 

    sc.stop() 
    } 
} 

發生我在做什麼錯誤?

回答

0

實際上,spark在本地模式下不支持容錯功能。

在上面的例子中,如果設置爲獨立(或紗線)羣集的某個主服務器,生成jar文件並通過spark-submit運行,行爲將如預期的那樣:某些任務將失敗, submited。如果應用程序有一些單例(Scala中的對象),它將在失敗的任務中保持自己的狀態。

1

隨機變量「r」隨機返回true/false,與spark執行沒有任何關係。

當你初始化:

val r = scala.util.Random 
val raise = r.nextBoolean() 

加薪得到一個隨機產生真或假。所以你

if (raise) 
     throw new Exception("oh ;(") 
     "Ok" 

隨機工作。我不明白你想達到什麼目的。 我得到以下輸出

--- map a in partition 0 in stage 0 raise = true 
--- map b in partition 1 in stage 0 raise = false 

當我重新運行它,我得到

--- map a in partition 0 in stage 0 raise = false 

加薪是一個隨機生成的布爾值,所以有時它會失敗對B A的某個時候。

+0

是的,任務隨機執行或失敗。我預料,那火花會重新提交(重新執行)失敗的任務。 –

+1

由於內部故障,Spark不會因爲用戶拋出異常而重新提交您的任務。 –

+0

好吧,我試着拋出SparkException和RuntimeException,但結果是一樣的。其實我想模擬容錯行爲。我如何做到這一點? –

相關問題