2017-04-13 45 views
0

我有一個相當簡單的卡夫卡設置 - 1個製作人,1個主題,10個分區,10個KafkaConsumers全部具有相同的組ID,全部在單臺機器上運行。當我處理一個文件時,製作者很快創建了3269條消息,消費者開始消費。一切都運行良好一段時間,但在某個時候,消費者開始消費重複 - 很多重複。實際上,它們看起來像是開始重新使用消息隊列。如果我讓它運行很長時間,數據庫將開始接收相同的數據條目6次或更多次。通過日誌記錄進行一些測試後,看起來消費者正在重複使用具有相同唯一消息名稱的相同消息。卡夫卡0.10.2消費者獲得大量副本

據我所知,沒有重新平衡正在發生。消費者不會死亡或被添加。這是同樣的10個消費者,一次又一次地消耗相同的3269個消息,直到我終止這個過程。如果我放手,消費者會寫出數十萬條記錄,大量增加真正應該進入數據庫的數據量。

我對卡夫卡相當陌生,但對於這種情況發生的原因我感到茫然。我知道卡夫卡並不能保證一次處理完成,而且我可以在這裏和那裏複製一對夫婦。我有代碼來防止再次保留相同的記錄。但是,我不確定爲什麼消費者會一遍又一遍地重複使用隊列。我知道卡夫卡消息在消費後並未被刪除,但是如果所有消費者都在同一組中,則補償應該防止這種情況發生,對吧?我對補償的工作原理略有了解,但據我所知,如果沒有重新平衡,他們不應該被重置,對吧?據我所知,這些消息不會超時。有沒有辦法讓我的消費者一次性使用隊列中的所有東西,然後等待更多的消息而不再重複使用相同的東西?

下面是我通過在生產者和消費者根據企業的性質:

Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("group.id", "MyGroup"); 
     props.put("num.partitions", 10); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

     MyIngester ingester = new MyIngester(args[0], props); 

回答

1

對我來說這似乎是確認收到的問題。 請嘗試以下屬性

props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "100");