2017-07-24 180 views
0

Spark 2.1.1(scala api)從s3位置流式傳輸json文件。Spark Streaming dropDuplicates

我想基於每個記錄的json中找到的ID列(「event_id」)來刪除任何傳入記錄。我不在乎保留哪條記錄,即使記錄的重複只是部分。我使用追加模式,因爲數據只是通過spark.sql()方法進行了豐富/過濾,沒有通過/窗口聚合進行分組。然後,我使用追加模式將實木複合地板文件寫入s3。

根據文檔,我應該能夠使用不帶水印的dropDuplicates進行重複數據刪除(顯然這在長時間運行的生產中無效)。然而,這種失敗,出現錯誤:

用戶類拋出異常:org.apache.spark.sql.AnalysisException:當有在流DataFrames /數據集流聚合不支持追加輸出模式

該錯誤似乎很奇怪因爲我沒有進行聚合(除非dropDuplicates或sparkSQL算作聚合?)。

我知道重複不會發生在彼此的3天之外,所以我通過添加一個水印(通過緊接在重複之前使用.withWatermark())再次嘗試它。但是,似乎要等到寫入數據之前的3天。 (即從今天7月24日起,只有截至7月21日同一時間的數據才寫入輸出)。

由於沒有聚合,我希望在處理批處理後立即寫入每一行,並簡單地丟棄任何具有在前3天內發生的事件標識的行。有沒有簡單的方法來實現這一點?

感謝

回答

0

解決方案。將apache.spark.sql.execution.streaming.Sink插入到配置單元表中,然後在批處理中刪除重複項,並對目標配置單元表中前幾天的數據執行左反連接。

1

以我爲例,我曾經通過DSTREAM實現在兩個方面:

方式一:

  1. 負載tmp_data(包含3個天內的唯一數據,見下文)
  2. 收到batch_data and do leftOuterJoin with tmp_data
  3. 在第二步和輸出做filter新的獨特的數據
  4. 更新tmp_data新的獨特的數據通過第二步的結果,並刪除舊數據(3天以上)
  5. 保存tmp_data HDFS上或任何
  6. 重複上面的再次並再次

另一種方式:

  1. 創建MySQL的一個表並設置UNIQUE INDEXEVENT_ID
  2. 收到batch_data,只是保存事項標識+ EVENT_TIME +無論到mysql
  3. MySQL會忽略重複我們使用的是組織的自定義實現自動
相關問題