2015-09-03 47 views
1

我試圖發送使用Spring集成卡夫卡V1.2.1基本字符串的有效載荷,但它與下面的異常失敗:Spring集成卡夫卡 - 發送一個基本的字符串

2015-09-03 11:50:39.729 ERROR 14418 --- [task-executor-3] [         ] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer 

我的XML配置如下所示:

<task:executor id="schedule-request-task-executor" pool-size="5" keep-alive="120" queue-capacity="125"/> 

<bean id="kafkaStringSerializer" class="org.apache.kafka.common.serialization.StringSerializer"/> 

<int-kafka:producer-context id="schedule-request-producer-context"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration topic="schedule.requests" 
              key-serializer="kafkaStringSerializer" 
              value-serializer="kafkaStringSerializer" 
              broker-list="${kafka.brokers}"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

<int-kafka:outbound-channel-adapter 
     kafka-producer-context-ref="schedule-request-producer-context" 
     channel="schedule-request-channel"> 
    <int:poller receive-timeout="0" 
       fixed-delay="100" time-unit="MILLISECONDS" 
       task-executor="schedule-request-task-executor"/> 
</int-kafka:outbound-channel-adapter> 

而且我用下面的代碼發送消息:

Message message = MessageBuilder.withPayload("PAYLOAD") 
       .setHeader("messageKey", "KEY") 
       .setHeader("topic", "schedule.requests") 
       .build(); 
scheduleRequestChannel.send(message); 

我看了一下https://github.com/spring-projects/spring-integration-extensions/blob/master/samples/的樣品,但是這些看起來已經過時了。

+0

我調試了這個,看起來ProducerRecord中的值在由於某種原因傳遞給序列化程序時轉換爲byte []。不知道爲什麼。 –

回答

2

調試SI和Kafka類後,我發現這是因爲Spring集成將字符串轉換爲byte [],除非在Producer配置中指定了key-class-typevalue-class-type

這是更新的配置,以防有人感興趣。

<int-kafka:producer-context id="schedule-request-producer-context"> 
     <int-kafka:producer-configurations> 
      <int-kafka:producer-configuration topic="schedule.requests" 
               key-class-type="java.lang.String" 
               key-serializer="kafkaStringSerializer" 
               value-class-type="java.lang.String" 
               value-serializer="kafkaStringSerializer" 
               broker-list="${kafka.brokers}"/> 
相關問題