2016-12-10 112 views
-1

在我們的應用程序中,我們的大部分代碼只適用於filter,group byaggregate操作DataFrame並將DF保存到Cassandra數據庫。Spark SQL數據框 - 異常處理

像下面的代碼一樣,我們有幾種方法可以在不同數量的字段上執行相同類型的操作[filter, group by, join, agg],並返回一個DF並將其保存到Cassandra表中。

示例代碼:

val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour()) 
.groupBy("country") 
.agg(sum(col("volume")) as "pmtVolume") 

saveToCassandra(df) 

def saveToCassandra(df: DataFrame) { 
    try { 
     df.write.format("org.apache.spark.sql.cassandra") 
     .options(Map("Table" -> "tableName", "keyspace" -> keyspace) 
     .mode("append").save() 
    } 
    catch { 
     case e: Throwable => log.error(e) 
    } 
} 

由於我打電話通過保存DF卡桑德拉的動作,我希望我需要處理只在該行按照this線程除外。

如果我收到任何異常,我可以在默認情況下在Spark詳細日誌中看到異常。

我一定要真正地圍繞過濾,按代碼Trytry , catch?

我沒有看到有異常處理星火SQL數據幀API實例的任何實例。

如何在saveToCassandra方法上使用Try?它返回Unit

回答

0

你並不真的需要圍繞filtergroup by代碼Trytrycatch。因爲所有這些操作都是轉換,所以在執行動作之前,它們不會執行,就像saveToCassandra一樣。

然而,如果在過濾分組聚集數據幀中發生錯誤,在saveToCassandra功能catch子句將作爲正在執行有動作記錄它。

0

在try catch中沒有必要包裝懶惰的DAG。
你需要在Try()中包裝lambda函數。
不幸的是,AFAIK沒有辦法在DataFrame中進行行級異常處理。

您可以使用RDD或數據集如下 spache spark exception handling

在答覆中提到這個職位