2017-09-26 41 views
7

我在使用YARN作爲資源管理器和2個節點的EMR中運行Spark任務。如果我的條件不符合,我需要有目的地失敗該步驟,因此下一步不會按照配置執行。 爲了實現這一點,我在dynamoDB中插入日誌消息後拋出了一個自定義異常。Spark,在EMR中拋出SparkException時出現錯誤行爲

它運行良好,但Dynamo中的記錄插入了兩次。

以下是我的代碼。

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

如果我刪除行以拋出異常,它工作正常,但步驟完成。

如何在不獲取日誌消息兩次的情況下使步驟失敗。

感謝您的幫助。

問候, Sorabh

回答

2

可能插入您的發電機消息的原因兩次是因爲您的錯誤條件被擊中,由兩個不同的遺囑執行人處理。斯帕克將工作中要完成的工作分開,這些工人不會分享任何知識。

我不確定驅動你的需求是否有Spark步驟失敗,但我會建議在你的應用程序代碼中跟蹤那個失敗案例,而不是試圖直接觸發死亡。換句話說,編寫檢測錯誤的代碼並將其傳遞給您的火花驅動程序,然後根據需要對其執行操作。

執行此操作的一種方法是使用累加器來計算處理數據時發生的任何錯誤。它會看起來大致是這樣的(我假設Scala和DataFrames,但您可以根據需要適應RDD的和/或Python):

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

關於這種方法的一個好處是,如果你正在尋找反饋在Spark UI中,您可以在運行時看到累加器值。作爲參考,這裏是關於蓄電池的文件: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators