2016-09-21 223 views
1

我們正在使用kafka(0.9.0.0)編排不同微服務之間的命令消息。我們發現一個間歇性問題,即重複的消息正在傳遞給特定的主題。發生此問題時發生的日誌如下所示。有人可以幫助理解這個問題Kafka傳遞重複消息

Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry 
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying 
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection. 
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request. 
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None. 
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092) 
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin 
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery. 
Wed, 21-Sep-2016 09:19:07 - ERROR LeaveGroup request failed: UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation. 
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None. 
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092) 
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin 
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery. 
Wed, 21-Sep-2016 09:19:10 - INFO Joined group 'kafka-python-default-group' (generation 5) with member_id kafka-python-1.0.2-8585f310-cb4f-493a-a98d-12ec9810419b 
Wed, 21-Sep-2016 09:19:10 - INFO Updated partition assignment: [TopicPartition(topic=u'ilinaTestPlatformReq', partition=0)] 
+0

如果消費者被阻止處理消息的時間超過'session.timeout',可能會發生這種情況。消費者處理消息需要多長時間? – Aliaxander

+0

是的。那是對的。當消費者花費超過30秒時,大致發生故障。其他明智的事情都很好。那麼是增加session.timeout正確的修復? –

+0

順便提一下,請提供您的kafka-python版本。 – Aliaxander

回答

2

Kafka documentation on Consumer config

session.timeout.ms(默認30000) - 用於使用卡夫卡的組管理設施時檢測 故障超時。在會話超時內沒有收到消費者的心跳信號時,代理將標記消費者失敗並重新平衡組。由於 僅在調用poll()時發送心跳,所以更高的會話 允許更多時間用於消費者的輪詢 循環中的消息處理,代價是檢測硬故障的時間更長。另請參閱 max.poll.records另一個選項,用於控制輪詢循環中的處理時間 。請注意,該值必須在代理配置中由group.min.session.timeout.msgroup.max.session.timeout.ms配置爲 的允許範圍內。

似乎如果消息處理時間大於30000毫秒,消費者重新平衡被觸發,這可能會導致重複的消息傳遞。你可以嘗試的是增加session.timeout.ms

另一種選擇是在處理消息之前使用pause()並在處理消息之後使用resume()來異步處理消息。在這種情況下,即使處理時間長於session.timeout.ms,用戶也會撥打poll()(併發送心跳)。因此,經紀人不會將您的消費者標記爲失敗,並且不會啓動重新平衡。

+0

謝謝Aliaxander。通過增加以下值來解決問題 group.min.session.timeout.ms = xxx session.timeout.ms = xxx group.max.session.timeout.ms = xxx –