2015-09-28 84 views
0

作爲我們當前Kafka集羣的一部分,高可用性測試(HA)正在完成。目標是,當生產者工作將數據推送到某個主題的特定分區時,所有Kafka集羣中的經紀人都會按順序重新啓動(Stop-first broker-重新啓動它,在第一個經紀人出現後,爲第二個經紀人執行相同的步驟,不久)。生產者工作正在推進約700萬條記錄約30分鐘,而這個測試正在進行。在工作結束時,有人注意到大約有1000條記錄丟失。Kafka滾動重啓:數據丟失

下面是我們卡夫卡集羣的細節:(kafka_2.10-0.8.2.0)

-3卡夫卡經紀人每個具有2 100GB安裝

主題用創建: -Replication 3 -min.insync.replica的因子= 2

server.properties:

broker.id=1 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=1048576 
socket.receive.buffer.bytes=1048576 
socket.request.max.bytes=104857600 
log.dirs=/drive1,/drive2 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.flush.interval.messages=10000 
log.retention.hours=1 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=1800000 
log.cleaner.enable=false 
zookeeper.connect=ZK1:2181,ZK2:2181,ZK3:2181 
zookeeper.connection.timeout.ms=10000 
advertised.host.name=XXXX 
auto.leader.rebalance.enable=true 
auto.create.topics.enable=false 
queued.max.requests=500 
delete.topic.enable=true 
controlled.shutdown.enable=true 
unclean.leader.election=false 
num.replica.fetchers=4 
controller.message.queue.size=10 

Producer.properties(aync與新的製作API生產商)

bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 
acks=all 
buffer.memory=33554432 
compression.type=snappy 
batch.size=32768 
linger.ms=5 
max.request.size=1048576 
block.on.buffer.full=true 
reconnect.backoff.ms=10 
retry.backoff.ms=100 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 

有人可以分享關於卡夫卡集羣和HA的任何信息,以確保在滾動重新啓動卡夫卡經紀商的數據會不會丟失?

此外,這是我的生產者代碼。這是一場火災,忘了那種製作人。截至目前,我們並沒有明確地處理失敗。幾乎可以處理幾百萬條記錄。只有當卡夫卡經紀人按照上面的說明重新啓動時,我纔看到問題。

public void sendMessage(List<byte[]> messages, String destination, Integer parition, String kafkaDBKey) { 

    for(byte[] message : messages) { 

     producer.send(new ProducerRecord<byte[], byte[]>(destination, parition, kafkaDBKey.getBytes(), message)); 

    } 

} 
+0

你總共跑了多久? 難道是因爲保留而刪除了一些消息嗎? 'log.retention.check'設置爲30分鐘,'log.retention.hours'設置爲1h。 – alpe1

+0

我跑了30分鐘的工作。保留期約爲4小時。我沒有看到任何數據正在被刪除。我通過驗證該分區的最早和最近的偏移來證實這一點。 –

+0

你的生產者如何處理失敗?發佈其代碼是可能的嗎? – nelsonda

回答

1

通過將生產者端的默認重試值從0增加到4000,我們能夠成功發送數據而不會丟失。 retries=4000

  • 由於此設置,有發送相同的消息兩次的可能性和消息是失序的由消費者接收它的時間(第二MSG可能首先MSG之前到達)。但是,對於我們目前的問題,這不是問題,並在消費者方面處理,以確保一切順利。
相關問題