2016-12-27 100 views
2

我想停止Spark重試Spark應用程序以防發生某些特定異常。如果滿足某些條件,我只想限制重試次數。否則,我需要默認的重試次數。如何控制每個任務/舞臺/作業嘗試的Spark應用程序?

請注意,Spark應用程序只運行一個Spark作業。

我試着在例外的情況下設置javaSparkContext.setLocalProperty("spark.yarn.maxAppAttempts", "1");,但仍然重試整個工作。

我提交星火應用程序如下:

spark-submit --deploy-mode cluster theSparkApp.jar 

我有,我想刪除,如果它是由同一作業的先前重試創建的輸出的使用情況,但未能如果工作輸出文件夾不是空的(在第1次重試中)。你能想出任何其他方式來實現這一目標嗎?

+0

你如何提交部署你的星火應用程序?什麼是使用的命令行選項和Spark屬性?順便說一句,即使你說你的意思是「整個Spark應用程序」,你仍然使用「整個工作仍在重試」。一個Spark應用程序可以運行/提交一個或多個Spark作業。 –

+0

您可否使用'spark-submit --deploy-mode cluster --conf spark.yarn.maxAppAttempts = 1'(並在命令行上使用Spark設置)。 –

回答

1

我有一個用例,如果輸出文件夾不是空的(在第一次重試時),那麼我想要刪除輸出,如果它是由同一作業的上一次重試創建的,但失敗。你能想出任何其他方式來實現這一目標嗎?

您可以使用TaskContext來控制你的星火工作的行爲方式給出,比如說,重試次數如下:

val rdd = sc.parallelize(0 to 8, numSlices = 1) 

import org.apache.spark.TaskContext 

def businessCondition(ctx: TaskContext): Boolean = { 
    ctx.attemptNumber == 0 
} 

val mapped = rdd.map { n => 
    val ctx = TaskContext.get 
    if (businessCondition(ctx)) { 
    println("Failing the task because business condition is met") 
    throw new IllegalArgumentException("attemptNumber == 0") 
    } 
    println(s"It's ok to proceed -- business condition is NOT met") 
    n 
} 
mapped.count 
+0

這裏的問題是,我不知道我的工作是因爲businessCondition()還是由於其他原因(除非我在Spark之外的某個地方保持此狀態,因此我想避免)而未能通過第一次重試。因此,我能想到的唯一可能的方式是在滿足businessCondition()的情況下強制Spark失敗,而不進行任何重試。 – user401445

+0

addTaskCompletionListener(listener:TaskCompletionListener):TaskContext'和'addTaskFailureListener(listener:TaskFailureListener):TaskContext'?以前從未使用過它們,但它們看起來好像在這裏可能會有所幫助。 –

+0

'onApplicationEnd'可以工作,但是無法在SparkListener中獲取TaskContext。我需要TaskContext來確定究竟要刪除什麼。另外,我不確定是否可以在偵聽器界面中查找應用程序是成功還是失敗。 – user401445

相關問題