Spark 2.1.1(scala api)從s3位置流式傳輸json文件。Spark Streaming dropDuplicates
我想基於每個記錄的json中找到的ID列(「event_id」)來刪除任何傳入記錄。我不在乎保留哪條記錄,即使記錄的重複只是部分。我使用追加模式,因爲數據只是通過spark.sql()方法進行了豐富/過濾,沒有通過/窗口聚合進行分組。然後,我使用追加模式將實木複合地板文件寫入s3。
根據文檔,我應該能夠使用不帶水印的dropDuplicates進行重複數據刪除(顯然這在長時間運行的生產中無效)。然而,這種失敗,出現錯誤:
用戶類拋出異常:org.apache.spark.sql.AnalysisException:當有在流DataFrames /數據集流聚合不支持追加輸出模式
該錯誤似乎很奇怪因爲我沒有進行聚合(除非dropDuplicates或sparkSQL算作聚合?)。
我知道重複不會發生在彼此的3天之外,所以我通過添加一個水印(通過緊接在重複之前使用.withWatermark())再次嘗試它。但是,似乎要等到寫入數據之前的3天。 (即從今天7月24日起,只有截至7月21日同一時間的數據才寫入輸出)。
由於沒有聚合,我希望在處理批處理後立即寫入每一行,並簡單地丟棄任何具有在前3天內發生的事件標識的行。有沒有簡單的方法來實現這一點?
感謝