該實現是在Python中實現的。使用confluent_kafka。卡夫卡消費者中的控制消息偏移量
我有一個消費者對象來輪詢來自kafka主題的消息。這些消息用於其他大型對象的進一步處理,並且由於大小而無法在每次消息處理後備份對象。
我週期性地轉儲對象,然後手動提交使用者。以下是我實施的示例代碼。
from confluent_kafka import Consumer, KafkaError, TopicPartition
c = Consumer({
'bootstrap.servers': 'myserver',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])
offsets = {}
for i in range(10):
msg = c.poll()
if msg.error():
continue
par = msg.partition()
off = msg.offset()
offsets[p] = off
c.commit(async=False)
print(offsets)
當我運行該代碼的第二時間,我期望消息偏移量,如果來自相同分區,應該是下一個,即1,與上印偏移。
但是補償提前了很多。數百個。
我也試圖手動分配位置如下:
lst_part = []
for par, off in offsets.items():
lst_part.append(TopicPartition('mytopic', par, off))
c.assign(lst_part)
# then start polling messages
新輪詢消息不是已分配的偏移+ 1