可能插入您的發電機消息的原因兩次是因爲您的錯誤條件被擊中,由兩個不同的遺囑執行人處理。斯帕克將工作中要完成的工作分開,這些工人不會分享任何知識。
我不確定驅動你的需求是否有Spark步驟失敗,但我會建議在你的應用程序代碼中跟蹤那個失敗案例,而不是試圖直接觸發死亡。換句話說,編寫檢測錯誤的代碼並將其傳遞給您的火花驅動程序,然後根據需要對其執行操作。
執行此操作的一種方法是使用累加器來計算處理數據時發生的任何錯誤。它會看起來大致是這樣的(我假設Scala和DataFrames,但您可以根據需要適應RDD的和/或Python):
val accum = sc.longAccumulator("Error Counter")
def doProcessing(a: String, b: String): String = {
if(condition) {
accum.add(1)
null
}
else {
doComputation(a, b)
}
}
val doProcessingUdf = udf(doProcessing _)
df = df.withColumn("result", doProcessing($"a", $"b"))
df.write.format(..).save(..) // Accumulator value not computed until an action occurs!
if(accum.value > 0) {
// An error detected during computation! Do whatever needs to be done.
<insert dynamo message here>
}
關於這種方法的一個好處是,如果你正在尋找反饋在Spark UI中,您可以在運行時看到累加器值。作爲參考,這裏是關於蓄電池的文件: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators