我們的Flink streaming工作流發佈消息給Kafka。 KafkaProducer的「重試」機制在內部緩衝區中添加消息之前不會啓動。Fink:KafkaProducer數據丟失
如果在此之前出現異常,KafkaProducer將拋出該異常,並且好像Flink沒有處理該異常。在這種情況下會有數據丟失。
相關弗林克代碼(FlinkKafkaProducerBase):
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
else {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
以下是該方案的,我們已經確定了會導致數據丟失:
所有卡夫卡經紀人下來。
在這種情況下,在將消息追加到緩衝區之前,KafkaProducer嘗試獲取元數據。如果KafkaProducer無法在配置的超時中獲取元數據,則會引發異常。
-Memory記錄不可寫(在卡夫卡0.9.0.1庫現有錯誤)
https://issues.apache.org/jira/browse/KAFKA-3594
在上述兩種情況下,KafkaProducer不會重試,和弗林克將忽略消息。這些消息甚至沒有記錄。例外是,但不是失敗的消息。
可能的解決方法(卡夫卡設置):
- 一種用於元數據超時非常高的值(metadata.fetch.timeout.ms)
- 一種用於緩衝期滿非常高的值(request.timeout.ms)
我們仍在調查改變上述卡夫卡設置的可能副作用。
那麼,我們的理解是否正確?或者有沒有辦法通過修改某些Flink設置來避免這種數據丟失?
謝謝。
您好紫水晶我知道了。感謝您的答覆。是的,我們有上述所有設置。所以,即使所有經紀商都下跌,我們也不應該有數據丟失。這實際上是Flink中的一個錯誤。請檢查:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html – Ninad
在「異步」的情況下不是這樣「版本(<= 0.9)中的發佈者,他們不會捕獲回調中的所有故障(如網絡,批量到期),並且您仍然可能在那裏丟失了一些消息? – kisna
這些設置來自新制作者。 – amethystic