2017-04-05 147 views
1

該實現是在Python中實現的。使用confluent_kafka。卡夫卡消費者中的控制消息偏移量

我有一個消費者對象來輪詢來自kafka主題的消息。這些消息用於其他大型對象的進一步處理,並且由於大小而無法在每次消息處理後備份對象。

我週期性地轉儲對象,然後手動提交使用者。以下是我實施的示例代碼。

from confluent_kafka import Consumer, KafkaError, TopicPartition 

c = Consumer({ 
    'bootstrap.servers': 'myserver', 
    'group.id': 'mygroup', 
    'default.topic.config': {'auto.offset.reset': 'smallest'}, 
    'enable.auto.commit': "false" 
}) 
c.subscribe(['mytopic']) 

offsets = {} 

for i in range(10): 
    msg = c.poll() 

    if msg.error(): 
     continue 

    par = msg.partition() 
    off = msg.offset() 
    offsets[p] = off 

c.commit(async=False) 

print(offsets) 

當我運行該代碼的第二時間,我期望消息偏移量,如果來自相同分區,應該是下一個,即1,與上印偏移。

但是補償提前了很多。數百個。

我也試圖手動分配位置如下:

lst_part = [] 

for par, off in offsets.items(): 
    lst_part.append(TopicPartition('mytopic', par, off)) 

c.assign(lst_part) 

# then start polling messages 

新輪詢消息不是已分配的偏移+ 1

回答

1

c.commit(async=False)將承諾針對其信息已被全部耗盡分區通過poll()呼叫從客戶端返回到應用程序。

如果你想更精細的控制可衝抵提交你可以傳遞一個明確的[TopicPartition(..)]列表commit()(請務必提交last_message_offset + 1)或禁止auto.offset.store和顯式調用store_offsets()您要的信息/偏移爲未來commit()打電話。

請注意,store_offsets()僅適用於master,並且在confluent-kafka-python客戶端的發佈版本中尚不可用,但很快會出現。

相關問題