在我們的應用程序中,我們的大部分代碼只適用於filter
,group by
和aggregate
操作DataFrame
並將DF保存到Cassandra數據庫。Spark SQL數據框 - 異常處理
像下面的代碼一樣,我們有幾種方法可以在不同數量的字段上執行相同類型的操作[filter, group by, join, agg
],並返回一個DF並將其保存到Cassandra表中。
示例代碼:
val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")
saveToCassandra(df)
def saveToCassandra(df: DataFrame) {
try {
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("Table" -> "tableName", "keyspace" -> keyspace)
.mode("append").save()
}
catch {
case e: Throwable => log.error(e)
}
}
由於我打電話通過保存DF卡桑德拉的動作,我希望我需要處理只在該行按照this線程除外。
如果我收到任何異常,我可以在默認情況下在Spark詳細日誌中看到異常。
我一定要真正地圍繞過濾,按代碼Try
或try , catch?
我沒有看到有異常處理星火SQL數據幀API實例的任何實例。
如何在saveToCassandra
方法上使用Try
?它返回Unit