2017-10-04 224 views
1

上述應從單個主題消費的卡夫卡消費者。因爲我整合卡夫卡消費者API與彈簧芯web應用程序我不能使用彈簧啓動..Kafka Listener方法未被調用。消費者不消費。

Spring的XML配置如下

<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties"> 
    <constructor-arg type="java.lang.String" value="127.0.0.1:9092" /> 
    <constructor-arg type="java.lang.String" value="tdm-group" /> 
    <constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" /> 
</bean> 
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig"> 
    <property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" /> 
</bean> 
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory" 
    factory-method="kafkaContainerFactory"> 
</bean> 

這是創建ListenerContainerFactory

@EnableKafka 
public class KafkaListenerContainerFactory { 

public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(1); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.getContainerProperties().setPollTimeout(3000); 
    return factory; 
} 

@SuppressWarnings("unchecked") 
public static ConsumerFactory<String, String> consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(), 
      KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer()); 
} 

} 

這是我的監聽器類標註有@KafkaListener

package com.azuga.kafka.listeners; 

import org.springframework.kafka.annotation.KafkaListener; 
public class Listener { 

@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping") 
public void onMessage(String message) { 
    System.out.println(message.toString()); 
} 
} 

這是KafkaListenerConfig類這需要在引導服務器,主題名稱等

@EnableKafka 
public class KafkaListenerConfig { 

private static KafkaConsumerProperties kafkaConsumerProperties; 

public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) { 
    this.kafkaConsumerProperties = kafkaConsumerProperties; 
} 

public static Map<String, Object> consumerProps() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap()); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup()); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    return props; 
} 

public static Deserializer stringKeyDeserializer() { 
    return new StringDeserializer(); 
} 

} 

回答

1

你必須爲你的應用有點不尋常配置。

但是我想你錯過了@EnableKafka約爲@Configuration類的事實。因此,根據Spring框架的文件,你必須使用AnnotationConfigWebApplicationContext類:

* {@link org.springframework.web.context.WebApplicationContext WebApplicationContext} 
* implementation which accepts annotated classes as input - in particular 
* {@link org.springframework.context.annotation.Configuration @Configuration}-annotated 
* classes, but also plain {@link org.springframework.stereotype.Component @Component} 
* classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows 
* for registering classes one by one (specifying class names as config location) as well 
* as for classpath scanning (specifying base packages as config location). 

不幸的是不會有隻是簡單的XML配置工作。

Spring Kafka沒有爲XML定義提供任何鉤子。

+0

感謝您的快速回復。我並不知道它不適用於xml配置。不過,我確實使用了註釋,它像一個魅力 – Sabya