2016-03-15 95 views
0

我有一個SOAP Web服務發送卡夫卡請求消息並等待卡夫卡響應消息(例如consumer.poll(10000))。爲什麼卡夫卡消費者繼續收到相同的消息(抵消)

每次調用Web服務時,都會創建一個新的Kafka Producer和一個新的Kafka Consumer。

每次我調用Web服務時,消費者都會收到相同的消息(例如具有相同偏移量的消息)。

我使用的是Kafka 0.9,並且啓用了自動提交併且自動提交頻率爲100 ms。

對於由poll()方法返回的每個ConsumerRecord,我在它自己的Callable(例如,

ConsumerRecords<String, String> records = consumer.poll(200); 

for (ConsumerRecord<String, String> record : records) { 

final Handler handler = new Handler(consumerRecord); 
      executor.submit(handler); 

} 

爲什麼我會一直重複收到相同的信息?

UPDATE 0001

metric.reporters = [] 
metadata.max.age.ms = 300000 
value.deserializer = class com.kafka.MDCDeserializer 
group.id = group-A.group 
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
reconnect.backoff.ms = 50 
sasl.kerberos.ticket.renew.window.factor = 0.8 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667] 
retry.backoff.ms = 100 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.service.name = kafka 
sasl.kerberos.ticket.renew.jitter = 0.05 
ssl.keystore.type = JKS 
ssl.trustmanager.algorithm = PKIX 
enable.auto.commit = true 
ssl.key.password = null 
fetch.max.wait.ms = 500 
sasl.kerberos.min.time.before.relogin = 60000 
connections.max.idle.ms = 540000 
ssl.truststore.password = null 
session.timeout.ms = 30000 
metrics.num.samples = 2 
client.id = 
ssl.endpoint.identification.algorithm = null 
key.deserializer = class com.kafka.UUIDDerializer 
ssl.protocol = TLS 
check.crcs = true 
request.timeout.ms = 40000 
ssl.provider = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.keystore.location = null 
heartbeat.interval.ms = 3000 
auto.commit.interval.ms = 5000 
receive.buffer.bytes = 32768 
ssl.cipher.suites = null 
ssl.truststore.type = JKS 
security.protocol = PLAINTEXTSASL 
ssl.truststore.location = null 
ssl.keystore.password = null 
ssl.keymanager.algorithm = IbmX509 
metrics.sample.window.ms = 30000 
fetch.min.bytes = 1024 
send.buffer.bytes = 131072 
auto.offset.reset = latest 
+0

是否有可能將您的卡夫卡消費者配置爲始終以最小的偏移量開始? –

回答

3

此基礎上,你是顯示的代碼。我認爲你的問題是新消費者是單線程的。如果您輪詢一次,然後不再進行輪詢,則auto.commit.offset不起作用。試着把你的代碼放在while循環中,看看你是否再次調用poll來提交偏移量。

+0

好點!一旦我們做了一個循環來調用多次輪詢,它就開始按預期工作。非常感謝您的時間 – Hector

+0

hector沒問題,如果http://stackoverflow.com/questions/35991849/how-does-should-kafka-consumer-cope-with-poison-messages/35993439#35993439對你也有幫助考慮接受它。 – Nautilus