2

我試圖從代碼運行卡夫卡消費者它總是異常,但我運行kafka-console-consumer.sh文件來檢查生產者它工作正常並顯示經紀人收到的所有消息。以下是pom.xml代碼和異常日誌。請告訴我錯在哪裏。春季集成kafka:org.apache.kafka.common.config.ConfigException當運行消費者

public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:2181"); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_coonfig"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.IntegerDeserializer"); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringDeserializer"); 
    return props; 
} 

這是我的測試類代碼。

@Test 
public void testSpringKafkaConsumer() throws InterruptedException { 

    try{ 
    String topics[] = { "programTopic3" }; 
    ConsumerFactory<Integer, String> factory = new DefaultKafkaConsumerFactory<>(configs); 
    factory.createConsumer(); 
    AbstractMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(factory, 
      topics); 
    container.setBeanName("container"); 

    final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>(); 
    container.setMessageListener(new MessageListener<Integer, String>() { 

     @Override 
     public void onMessage(ConsumerRecord<Integer, String> message) { 
      // logger.info("received: " + message); 
      System.out.println("received: --------+++++++++++++++------------" + message); 
      records.add(message); 
     } 
    }); 
    KafkaMessageDrivenChannelAdapter<Integer, String> adaptor = new KafkaMessageDrivenChannelAdapter<>(container); 

    adaptor.start(); 
    ConsumerRecord<Integer, String> poll = null; 
    while((poll =records.take()) != null){ 
     System.out.println(poll.topic() + " topic"); 
     System.out.println(poll.key() + " key"); 
     System.out.println(poll.value()+ " value"); 
    } 

    }catch(Exception exception) 
    { 
     exception.printStackTrace(); 
     Assert.fail(); 
    } 
} 

的pom.xml

<?xml version="1.0" encoding="UTF-8"?> 

http://maven.apache.org/xsd/maven-4.0.0.xsd「> 4.0.0

<groupId>com.learn.kafka.integrate.spring</groupId> 
<artifactId>SpringIntegrationKafka</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>SpringIntegrationKafka</name> 
<description>Demo project for Spring Integration kafka</description> 

<properties> 
    <springVersion>4.2.5.RELEASE</springVersion> 
    <springIntegrationVersion>4.2.5.RELEASE</springIntegrationVersion> 
    <mockitoVersion>1.10.19</mockitoVersion> 
</properties> 
<repositories> 
    <repository> 
     <id>repository.spring.milestone</id> 
     <name>Spring Milestone Repository</name> 
     <url>http://repo.spring.io/milestone</url> 
    </repository> 
</repositories> 
<dependencies> 
    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-api</artifactId> 
     <version>1.7.21</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-core</artifactId> 
     <version>${springIntegrationVersion}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-kafka</artifactId> 
     <version>2.0.0.M1</version> 
    </dependency> 
    <dependency> 
<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.10</artifactId> 
<version>0.9.0.1</version> 
</dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-test</artifactId> 
     <version>${springVersion}</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-test</artifactId> 
     <version>${springVersion}</version> 
    </dependency> 
</dependencies> 
<build> 
    <plugins> 
     <plugin> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <version>3.3</version> 
      <configuration> 
       <source>1.8</source> 
       <target>1.8</target> 
      </configuration> 
     </plugin> 
    </plugins> 
</build> 

例外日誌:

org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value. 
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:148) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56) 
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336) 
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512) 
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494) 
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:46) 
at com.learn.kafka.integrate.spring.TestConsumer.testSpringKafkaConsumer(TestConsumer.java:83) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
at java.lang.reflect.Method.invoke(Unknown Source) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) 
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) 
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) 
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
+0

您還沒有顯示整個故事'ConsumerFactory <整數,字符串> factory = new DefaultKafkaConsumerFactory <>(configs);''看起來'configs'變量沒有引用'consumerConfigs()'創建的屬性。 –

回答

2
org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value. 

看起來像你的new DefaultKafkaConsumerFactory<>(configs);不使用那個consumerConfigs()

從對方KafkaMessageDrivenChannelAdapter正是這一點在它的構造函數:

this.messageListenerContainer = messageListenerContainer; 
this.messageListenerContainer.setAutoStartup(false); 
this.messageListenerContainer.setMessageListener(this.listener); 

所以,你container.setMessageListener(new MessageListener<Integer, String>() {不可達。因此,records中將不會出現任何內容。

如果您還不明白,我建議避免使用Spring Integration進行此特定測試。

對於KafkaMessageDrivenChannelAdapter變體,您必須指定outputChannel作爲QueueChannel以使用poll方式檢索消息。

但你也必須做更多BeanFactory東西在KafkaMessageDrivenChannelAdapter左右。

見我們的測試案例的詳細信息:https://github.com/spring-projects/spring-integration-kafka/blob/master/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

還要注意根據卡夫卡0.9示例應用程序,也:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka

+0

感謝您的寶貴意見。我嘗試了相同的代碼,它工作得很好,如果我給了9092端口號的地址是kafka服務器端口,但如果從kafka消費者控制檯運行,我提供了zookeeper端口2181.所以我對此感到困惑。 – rahul

+0

是的,這是正確的:http://stackoverflow.com/questions/34935596/zookeeper-usage-on-kafka-0-9-0 –