2017-08-01 122 views
0

在項目中,我進行RDD內的行動和變革的一些副作用。我想測試一下,即使Spark引擎必須重試某些分區的計算,我的業務邏輯也能正常工作。 所以我試圖在計算過程中模擬失敗。測試火花的故障轉移

object Test extends App { 
    val conf = new SparkConf() 
    conf.setMaster("local[4]") 
    conf.setAppName(getClass.getName) 
    val sc = new SparkContext(conf) 
    try { 
    val data = sc.parallelize(1 to 10) 
    val res = data.map(n => { 
     if (TaskContext.get().attemptNumber() == 0 && n==5) { 
     sys.error("boom!") 
     } 
     n * 2 
    }).collect() 
    } 
    finally { 
    sc.stop() 
    } 
} 

但它不起作用:異常傳播到驅動程序。 似乎Spark試圖僅故障轉移其內部錯誤。 有什麼方法可以測試它嗎?

回答

0

我想說的是,你很可能使你程序中的錯誤,並與火花容錯計算機制的理解。

首先,請參閱所述scala doc特別是這個函數

def error(message: String): Nothing

拋出一個新的RuntimeException與所提供的消息

如果您在程序中插入此代碼,並且沒有捕獲異常聲明,請調用此函數將導致運行時異常並使當前進程終止

,這是不是與火花的容錯計算機制有關,這只是與過程,並在其上運行的操作系統無關!沒有catch的運行時異常會導致控制流(aka.current進程)被終止。

但是,什麼是火花的容錯計算機制?

它是關於火花的內部,它使用多重複制來保證計算安全,一個特定的火花應用程序的每個任務可能會崩潰,並有許多可能的原因,如網絡IO失敗。

而是通過使用一個聰明的特技,驅動節點維持RDDS的依賴關係的譜系。 然後任何分區(aka.task)可以使用驅動節點的沿襲從原始數據集重新計算。

它是關於應用程序(火花)的水平,而不是OS +處理水平。