2017-02-02 87 views
0

我們的Flink streaming工作流發佈消息給Kafka。 KafkaProducer的「重試」機制在內部緩衝區中添加消息之前不會啓動。Fink:KafkaProducer數據丟失

如果在此之前出現異常,KafkaProducer將拋出該異常,並且好像Flink沒有處理該異常。在這種情況下會有數據丟失。

相關弗林克代碼(FlinkKafkaProducerBase):

if (logFailuresOnly) { 
      callback = new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception e) { 
        if (e != null) { 
         LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); 
        } 
        acknowledgeMessage(); 
       } 
      }; 
     } 
     else { 
      callback = new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null && asyncException == null) { 
         asyncException = exception; 
        } 
        acknowledgeMessage(); 
       } 
      }; 
     } 

以下是該方案的,我們已經確定了會導致數據丟失:

  1. 所有卡夫卡經紀人下來。

    在這種情況下,在將消息追加到緩衝區之前,KafkaProducer嘗試獲取元數據。如果KafkaProducer無法在配置的超時中獲取元數據,則會引發異常。

  2. -Memory記錄不可寫(在卡夫卡0.9.0.1庫現有錯誤)

https://issues.apache.org/jira/browse/KAFKA-3594

在上述兩種情況下,KafkaProducer不會重試,和弗林克將忽略消息。這些消息甚至沒有記錄。例外是,但不是失敗的消息。

可能的解決方法(卡夫卡設置):

  1. 一種用於元數據超時非常高的值(metadata.fetch.timeout.ms)
  2. 一種用於緩衝期滿非常高的值(request.timeout.ms)

我們仍在調查改變上述卡夫卡設置的可能副作用。

那麼,我們的理解是否正確?或者有沒有辦法通過修改某些Flink設置來避免這種數據丟失?

謝謝。

回答

0

這是我正在考慮你的問題。 見卡夫卡的一個保證第一:

對於複製因子N的話題,我們將承受最多N-1服務器故障不失致力於日誌任何記錄

首先,它關心提交給日誌的消息或記錄。任何未能交付的記錄都不會被視爲已提交。其次,如果你所有的經紀人都倒閉了,會有一些數據丟失。下面

設置是什麼,我們使用,以防止對製片方的數據丟失:

  • block.on.buffer。全=真
  • 的ACK =所有
  • 重試= MAX_VALUE
  • max.in.flight.requests.per.connection = 1
  • 使用KafkaProducer.send(記錄,回調),而不是發送(記錄)
  • unclean.leader.election.enable =假
  • replication.factor> min.insync.replicas
  • min.insync.replicas> 1
+0

您好紫水晶我知道了。感謝您的答覆。是的,我們有上述所有設置。所以,即使所有經紀商都下跌,我們也不應該有數據丟失。這實際上是Flink中的一個錯誤。請檢查:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html – Ninad

+0

在「異步」的情況下不是這樣「版本(<= 0.9)中的發佈者,他們不會捕獲回調中的所有故障(如網絡,批量到期),並且您仍然可能在那裏丟失了一些消息? – kisna

+0

這些設置來自新制作者。 – amethystic