2016-10-03 50 views
1

我想知道在Flink中是否有內置的錯誤處理選項。 可能有兩種情況:apache flink - 錯誤處理的正確方法

  1. 從卡夫卡當前消息(對我來說)是無效的,繼續下一個

  2. 未捕獲的異常 - 從我看到它可以完全停止流聚集。

ho我可以處理這兩種情況嗎? (Java代碼)

回答

3

1)這是一個flatMap慣用做:如果你的消息是有效的,你去與包含您的有效元素(或許已經在同一工序處理)的列表。如果它無效,則只需返回一個空列表,以便該步驟不生成任何元素。我可以提供Scala代碼,但我不熟悉Java API,所以我不想讓你偏離軌道。只需檢查flatMap呼叫。

2)這取決於異常的類型:如果它是由你自己的代碼挑起的,正好趕上它,處理它的運營商裏面,或者簡單地記錄它,繼續前進。沒有任何有關特定案例的進一步信息,這是我所知道的最好的,但再次,來自Scala我沒有遇到運行時異常。

+0

謝謝!約.1。我的收集功能獲得了一個Tupple。我可以用空來調用它嗎?它會起作用嗎?或者最好不要叫'collect'? –

+0

對,對不起。我的回答集中在Scala API上,但很明顯與Java API中的收集器有點不同。你可以避免調用'collect',是的。 – Chobeat