0
我正在使用春季卡夫卡1.0.3消費卡夫卡消息。 2在kafka中的主題和每個主題都有1個分區。 在java代碼中,有2個@KafkaListener消費每個主題消息。 ConcurrentKafkaListenerContainerFactory的併發性設置爲1.但消息有時會延遲20多秒。春季卡夫卡消息消費延遲
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.brokers}")
private String brokers;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "server-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
@KafkaListenr代碼如下:
@KafkaListener(id = "serverInChannel",topics = CommonContants.KAFKA_TOPIC.SERVER_IN_CHANNEL)
public void consumeInMessage(@Payload String data,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
Acknowledgment ack) {
logger.info("[serverInMessage]data=" + data);
ack.acknowledge();
}
@KafkaListener(id = "webOutChannel",topics = CommonContants.KAFKA_TOPIC.WEB_OUT_CHANNEL)
public void consumeOutMessage(@Payload String data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
Acknowledgment ack) {
logger.info("[webOutMessage]data=" + data);
ack.acknowledge();
}
卡夫卡消息被彈簧卡夫卡發送和日誌是在這裏:
2016-09-26 16:16:42,777 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-26 16:16:44,101 [webOutChannel-0-kafka-listener-6] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-26 16:16:45,551 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0]
2016-09-26 16:16:45,562 [serverInChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-26 16:16:45,663 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0]
2016-09-26 16:16:45,715 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[]
2016-09-26 16:16:45,781 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0]
2016-09-26 16:16:45,805 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-26 16:16:45,870 [webOutChannel-0-kafka-listener-7] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-26 16:17:01,099 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
2016-09-26 16:17:02,054 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0]
2016-09-26 16:17:02,108 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0]
2016-09-26 16:17:02,120 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0]
2016-09-26 16:17:02,133 [serverInChannel-0-kafka-listener-5] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
沒有在日誌中一些partitions assigned:[]
,我不知道爲什麼分配是空的。當webOutMessage消息在2016-09-26 16:16:45,870
處消耗後,下一個動作是在2016-09-26 16:17:01,099
的15+秒之後的消費者重新平衡,因此下一個消息在15+秒內延遲。
有人知道爲什麼嗎?
添加調試日誌,並延遲不是每次,有時是確定:
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=179, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 179, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-listener-2] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}}
2016-09-27 15:33:58,329 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}}
2016-09-27 15:33:58,623 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker
2016-09-27 15:33:58,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Stopping invoker
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:33:59,248 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[web_message_out_channel-0]
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Invoker stopped
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:33:59,330 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[server_message_in_channel-0]
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {}
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {}
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[]
2016-09-27 15:33:59,405 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[]
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:33:59,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:33:59,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:00,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:00,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:01,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:01,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:02,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:02,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:03,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:03,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:04,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:04,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:05,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:05,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:06,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:06,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:07,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:07,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:08,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:08,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:09,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:09,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:10,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:10,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:11,405 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:11,624 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:12,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:12,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:13,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:13,626 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:14,406 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:14,599 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
2016-09-27 15:34:14,599 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
2016-09-27 15:34:15,276 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=174, metadata=''}}
2016-09-27 15:34:15,286 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=180, metadata=''}}
2016-09-27 15:34:15,516 [serverInChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[server_message_in_channel-0]
2016-09-27 15:34:15,516 [webOutChannel-0-kafka-consumer-1] [INFO] o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[web_message_out_channel-0]
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:15,518 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2016-09-27 15:34:15,518 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 3 records
2016-09-27 15:34:15,787 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=174, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 174, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-27 15:34:15,792 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=175, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 175, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=176, kafka_receivedTopic=web_message_out_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = web_message_out_channel, partition = 0, offset = 176, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,793 [webOutChannel-0-kafka-listener-4] [INFO] c.x.u.n.s.l.MessageListener - [webOutMessage]data=123abc
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}}
2016-09-27 15:34:15,794 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {web_message_out_channel-0=OffsetAndMetadata{offset=177, metadata=''}}
2016-09-27 15:34:15,795 [webOutChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Woken up during commit
2016-09-27 15:34:15,796 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 2 records
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=180, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 180, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,805 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [DEBUG] o.s.k.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=123abc, headers={kafka_receivedPartitionId=0, kafka_receivedMessageKey=1234567890-1474961636372, kafka_offset=181, kafka_receivedTopic=server_message_in_channel, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = server_message_in_channel, partition = 0, offset = 181, key = 1234567890-1474961636372, value = 123abc)}]]
2016-09-27 15:34:15,806 [serverInChannel-0-kafka-listener-3] [INFO] c.x.u.n.s.l.MessageListener - [serverInMessage]data=123abc
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}}
2016-09-27 15:34:15,808 [serverInChannel-0-kafka-consumer-1] [DEBUG] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {server_message_in_channel-0=OffsetAndMetadata{offset=182, metadata=''}}
爲o.s.kafka和o.a.k.clients啓用DEBUG日誌記錄以查看它是否提供更多信息。 –
在問題結尾添加一些調試日誌。 – tkec